mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Revert "CloudWatch Logs: Queries in an expression should run synchronously (#64443)"
This reverts commit 74436d31de
.
This commit is contained in:
parent
6f73777d59
commit
972e611f76
@ -21,8 +21,6 @@ var (
|
|||||||
logger = log.New("expr")
|
logger = log.New("expr")
|
||||||
)
|
)
|
||||||
|
|
||||||
const FromExpressionHeaderName = "FromExpression"
|
|
||||||
|
|
||||||
type QueryError struct {
|
type QueryError struct {
|
||||||
RefID string
|
RefID string
|
||||||
Err error
|
Err error
|
||||||
@ -229,7 +227,6 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
|||||||
},
|
},
|
||||||
Headers: dn.request.Headers,
|
Headers: dn.request.Headers,
|
||||||
}
|
}
|
||||||
req.Headers[FromExpressionHeaderName] = "true"
|
|
||||||
|
|
||||||
responseType := "unknown"
|
responseType := "unknown"
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
|
||||||
"github.com/grafana/grafana/pkg/expr"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
@ -156,7 +155,7 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
|||||||
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
||||||
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
||||||
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
||||||
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
|
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
||||||
*/
|
*/
|
||||||
q := req.Queries[0]
|
q := req.Queries[0]
|
||||||
var model DataQueryJson
|
var model DataQueryJson
|
||||||
@ -164,12 +163,11 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||||
_, fromExpression := req.Headers[expr.FromExpressionHeaderName]
|
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
|
||||||
isSyncLogQuery := (fromAlert || fromExpression) && model.QueryMode == logsQueryMode
|
|
||||||
if isSyncLogQuery {
|
if isLogAlertQuery {
|
||||||
return executeSyncLogQuery(ctx, e, req)
|
return e.executeLogAlertQuery(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
var result *backend.QueryDataResponse
|
var result *backend.QueryDataResponse
|
||||||
|
@ -17,7 +17,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana/pkg/expr"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
@ -193,7 +192,7 @@ func Test_CheckHealth(t *testing.T) {
|
|||||||
}, resp)
|
}, resp)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
func Test_executeSyncLogQuery(t *testing.T) {
|
func Test_executeLogAlertQuery(t *testing.T) {
|
||||||
origNewCWClient := NewCWClient
|
origNewCWClient := NewCWClient
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
NewCWClient = origNewCWClient
|
NewCWClient = origNewCWClient
|
||||||
@ -255,70 +254,6 @@ func Test_executeSyncLogQuery(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, []string{"instance manager's region"}, sess.calledRegions)
|
assert.Equal(t, []string{"instance manager's region"}, sess.calledRegions)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with header", func(t *testing.T) {
|
|
||||||
testcases := []struct {
|
|
||||||
name string
|
|
||||||
headers map[string]string
|
|
||||||
called bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"alert header",
|
|
||||||
map[string]string{ngalertmodels.FromAlertHeaderName: "some value"},
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"expression header",
|
|
||||||
map[string]string{expr.FromExpressionHeaderName: "some value"},
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"no header",
|
|
||||||
map[string]string{},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
origExecuteSyncLogQuery := executeSyncLogQuery
|
|
||||||
var syncCalled bool
|
|
||||||
executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
||||||
syncCalled = true
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testcases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
syncCalled = false
|
|
||||||
cli = fakeCWLogsClient{queryResults: cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Complete")}}
|
|
||||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
|
||||||
return DataSource{Settings: models.CloudWatchSettings{AWSDatasourceSettings: awsds.AWSDatasourceSettings{Region: "instance manager's region"}}}, nil
|
|
||||||
})
|
|
||||||
sess := fakeSessionCache{}
|
|
||||||
|
|
||||||
executor := newExecutor(im, newTestConfig(), &sess, featuremgmt.WithFeatures())
|
|
||||||
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
|
|
||||||
Headers: tc.headers,
|
|
||||||
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
|
|
||||||
Queries: []backend.DataQuery{
|
|
||||||
{
|
|
||||||
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
|
|
||||||
JSON: json.RawMessage(`{
|
|
||||||
"queryMode": "Logs",
|
|
||||||
"type": "logAction",
|
|
||||||
"subtype": "StartQuery",
|
|
||||||
"region": "default",
|
|
||||||
"queryString": "fields @message"
|
|
||||||
}`),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, tc.called, syncCalled)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
executeSyncLogQuery = origExecuteSyncLogQuery
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *testing.T) {
|
func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *testing.T) {
|
||||||
|
@ -18,7 +18,7 @@ const (
|
|||||||
alertPollPeriod = time.Second
|
alertPollPeriod = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||||
resp := backend.NewQueryDataResponse()
|
resp := backend.NewQueryDataResponse()
|
||||||
|
|
||||||
for _, q := range req.Queries {
|
for _, q := range req.Queries {
|
||||||
@ -45,7 +45,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery)
|
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, logsQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -73,7 +73,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||||
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
||||||
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
|
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
|
||||||
if err != nil {
|
if err != nil {
|
Loading…
Reference in New Issue
Block a user