mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudWatch Logs: Fix running logs queries with expressions (#65306)
This commit is contained in:
parent
09078b14e1
commit
92965578b3
@ -103,6 +103,61 @@ describe('DataSourceWithBackend', () => {
|
||||
`);
|
||||
});
|
||||
|
||||
test('correctly creates expression queries', () => {
|
||||
const { mock, ds } = createMockDatasource();
|
||||
ds.query({
|
||||
maxDataPoints: 10,
|
||||
intervalMs: 5000,
|
||||
targets: [{ refId: 'A' }, { refId: 'B', datasource: { type: '__expr__' } }],
|
||||
dashboardUID: 'dashA',
|
||||
panelId: 123,
|
||||
queryGroupId: 'abc',
|
||||
} as DataQueryRequest);
|
||||
|
||||
const args = mock.calls[0][0];
|
||||
|
||||
expect(mock.calls.length).toBe(1);
|
||||
expect(args).toMatchInlineSnapshot(`
|
||||
{
|
||||
"data": {
|
||||
"queries": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "dummy",
|
||||
"uid": "abc",
|
||||
},
|
||||
"datasourceId": 1234,
|
||||
"intervalMs": 5000,
|
||||
"maxDataPoints": 10,
|
||||
"queryCachingTTL": undefined,
|
||||
"refId": "A",
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"name": "Expression",
|
||||
"type": "__expr__",
|
||||
"uid": "__expr__",
|
||||
},
|
||||
"refId": "B",
|
||||
},
|
||||
],
|
||||
},
|
||||
"headers": {
|
||||
"X-Dashboard-Uid": "dashA",
|
||||
"X-Datasource-Uid": "abc",
|
||||
"X-Grafana-From-Expr": "true",
|
||||
"X-Panel-Id": "123",
|
||||
"X-Plugin-Id": "dummy",
|
||||
"X-Query-Group-Id": "abc",
|
||||
},
|
||||
"hideFromInspector": false,
|
||||
"method": "POST",
|
||||
"requestId": undefined,
|
||||
"url": "/api/ds/query?expression=true",
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
test('should apply template variables only for the current data source', () => {
|
||||
const { mock, ds } = createMockDatasource();
|
||||
ds.applyTemplateVariables = jest.fn();
|
||||
|
@ -78,6 +78,7 @@ enum PluginRequestHeaders {
|
||||
DashboardUID = 'X-Dashboard-Uid', // mainly useful for debuging slow queries
|
||||
PanelID = 'X-Panel-Id', // mainly useful for debuging slow queries
|
||||
QueryGroupID = 'X-Query-Group-Id', // mainly useful to find related queries with query chunking
|
||||
FromExpression = 'X-Grafana-From-Expr', // used by datasources to identify expression queries
|
||||
}
|
||||
|
||||
/**
|
||||
@ -197,14 +198,16 @@ class DataSourceWithBackend<
|
||||
});
|
||||
}
|
||||
|
||||
let url = '/api/ds/query';
|
||||
if (hasExpr) {
|
||||
url += '?expression=true';
|
||||
}
|
||||
|
||||
const headers: Record<string, string> = {};
|
||||
headers[PluginRequestHeaders.PluginID] = Array.from(pluginIDs).join(', ');
|
||||
headers[PluginRequestHeaders.DatasourceUID] = Array.from(dsUIDs).join(', ');
|
||||
|
||||
let url = '/api/ds/query';
|
||||
if (hasExpr) {
|
||||
headers[PluginRequestHeaders.FromExpression] = 'true';
|
||||
url += '?expression=true';
|
||||
}
|
||||
|
||||
if (request.dashboardUID) {
|
||||
headers[PluginRequestHeaders.DashboardUID] = request.dashboardUID;
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ func (m *TracingHeaderMiddleware) applyHeaders(ctx context.Context, req backend.
|
||||
return
|
||||
}
|
||||
|
||||
var headersList = []string{query.HeaderQueryGroupID, query.HeaderPanelID, query.HeaderDashboardUID, query.HeaderDatasourceUID, `X-Grafana-Org-Id`}
|
||||
var headersList = []string{query.HeaderQueryGroupID, query.HeaderPanelID, query.HeaderDashboardUID, query.HeaderDatasourceUID, query.HeaderFromExpression, `X-Grafana-Org-Id`}
|
||||
|
||||
for _, headerName := range headersList {
|
||||
gotVal := reqCtx.Req.Header.Get(headerName)
|
||||
|
@ -113,6 +113,7 @@ func TestTracingHeaderMiddleware(t *testing.T) {
|
||||
req.Header[`X-Grafana-Org-Id`] = []string{"1"}
|
||||
req.Header[`X-Panel-Id`] = []string{"2"}
|
||||
req.Header[`X-Query-Group-Id`] = []string{"d26e337d-cb53-481a-9212-0112537b3c1a"}
|
||||
req.Header[`X-Grafana-From-Expr`] = []string{"true"}
|
||||
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
|
||||
@ -133,12 +134,13 @@ func TestTracingHeaderMiddleware(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, cdt.QueryDataReq.GetHTTPHeaders(), 5)
|
||||
require.Len(t, cdt.QueryDataReq.GetHTTPHeaders(), 6)
|
||||
require.Equal(t, `lN53lOcVk`, cdt.QueryDataReq.GetHTTPHeader(`X-Dashboard-Uid`))
|
||||
require.Equal(t, `aIyC_OcVz`, cdt.QueryDataReq.GetHTTPHeader(`X-Datasource-Uid`))
|
||||
require.Equal(t, `1`, cdt.QueryDataReq.GetHTTPHeader(`X-Grafana-Org-Id`))
|
||||
require.Equal(t, `2`, cdt.QueryDataReq.GetHTTPHeader(`X-Panel-Id`))
|
||||
require.Equal(t, `d26e337d-cb53-481a-9212-0112537b3c1a`, cdt.QueryDataReq.GetHTTPHeader(`X-Query-Group-Id`))
|
||||
require.Equal(t, `true`, cdt.QueryDataReq.GetHTTPHeader(`X-Grafana-From-Expr`))
|
||||
})
|
||||
|
||||
t.Run("tracing headers are set for health check", func(t *testing.T) {
|
||||
@ -156,12 +158,13 @@ func TestTracingHeaderMiddleware(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, cdt.CheckHealthReq.GetHTTPHeaders(), 5)
|
||||
require.Len(t, cdt.CheckHealthReq.GetHTTPHeaders(), 6)
|
||||
require.Equal(t, `lN53lOcVk`, cdt.CheckHealthReq.GetHTTPHeader(`X-Dashboard-Uid`))
|
||||
require.Equal(t, `aIyC_OcVz`, cdt.CheckHealthReq.GetHTTPHeader(`X-Datasource-Uid`))
|
||||
require.Equal(t, `1`, cdt.CheckHealthReq.GetHTTPHeader(`X-Grafana-Org-Id`))
|
||||
require.Equal(t, `2`, cdt.CheckHealthReq.GetHTTPHeader(`X-Panel-Id`))
|
||||
require.Equal(t, `d26e337d-cb53-481a-9212-0112537b3c1a`, cdt.CheckHealthReq.GetHTTPHeader(`X-Query-Group-Id`))
|
||||
require.Equal(t, `true`, cdt.CheckHealthReq.GetHTTPHeader(`X-Grafana-From-Expr`))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -24,11 +24,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
HeaderPluginID = "X-Plugin-Id" // can be used for routing
|
||||
HeaderDatasourceUID = "X-Datasource-Uid" // can be used for routing/ load balancing
|
||||
HeaderDashboardUID = "X-Dashboard-Uid" // mainly useful for debuging slow queries
|
||||
HeaderPanelID = "X-Panel-Id" // mainly useful for debuging slow queries
|
||||
HeaderQueryGroupID = "X-Query-Group-Id" // mainly useful for finding related queries with query chunking
|
||||
HeaderPluginID = "X-Plugin-Id" // can be used for routing
|
||||
HeaderDatasourceUID = "X-Datasource-Uid" // can be used for routing/ load balancing
|
||||
HeaderDashboardUID = "X-Dashboard-Uid" // mainly useful for debuging slow queries
|
||||
HeaderPanelID = "X-Panel-Id" // mainly useful for debuging slow queries
|
||||
HeaderQueryGroupID = "X-Query-Group-Id" // mainly useful for finding related queries with query chunking
|
||||
HeaderFromExpression = "X-Grafana-From-Expr" // used by datasources to identify expression queries
|
||||
)
|
||||
|
||||
func ProvideService(
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/query"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/clients"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||
@ -160,7 +161,7 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
||||
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
||||
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
||||
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
||||
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
||||
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
|
||||
*/
|
||||
q := req.Queries[0]
|
||||
var model DataQueryJson
|
||||
@ -168,11 +169,12 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
|
||||
|
||||
if isLogAlertQuery {
|
||||
return e.executeLogAlertQuery(ctx, req)
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
fromExpression := req.GetHTTPHeader(query.HeaderFromExpression) != ""
|
||||
isSyncLogQuery := (fromAlert || fromExpression) && model.QueryMode == logsQueryMode
|
||||
if isSyncLogQuery {
|
||||
return executeSyncLogQuery(ctx, e, req)
|
||||
}
|
||||
|
||||
var result *backend.QueryDataResponse
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/query"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/mocks"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||
@ -192,7 +193,7 @@ func Test_CheckHealth(t *testing.T) {
|
||||
}, resp)
|
||||
})
|
||||
}
|
||||
func Test_executeLogAlertQuery(t *testing.T) {
|
||||
func Test_executeSyncLogQuery(t *testing.T) {
|
||||
origNewCWClient := NewCWClient
|
||||
t.Cleanup(func() {
|
||||
NewCWClient = origNewCWClient
|
||||
@ -254,6 +255,70 @@ func Test_executeLogAlertQuery(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []string{"instance manager's region"}, sess.calledRegions)
|
||||
})
|
||||
|
||||
t.Run("with header", func(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
headers map[string]string
|
||||
called bool
|
||||
}{
|
||||
{
|
||||
"alert header",
|
||||
map[string]string{ngalertmodels.FromAlertHeaderName: "some value"},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"expression header",
|
||||
map[string]string{fmt.Sprintf("http_%s", query.HeaderFromExpression): "some value"},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"no header",
|
||||
map[string]string{},
|
||||
false,
|
||||
},
|
||||
}
|
||||
origExecuteSyncLogQuery := executeSyncLogQuery
|
||||
var syncCalled bool
|
||||
executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
syncCalled = true
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
syncCalled = false
|
||||
cli = fakeCWLogsClient{queryResults: cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Complete")}}
|
||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
return DataSource{Settings: models.CloudWatchSettings{AWSDatasourceSettings: awsds.AWSDatasourceSettings{Region: "instance manager's region"}}}, nil
|
||||
})
|
||||
sess := fakeSessionCache{}
|
||||
|
||||
executor := newExecutor(im, newTestConfig(), &sess, featuremgmt.WithFeatures())
|
||||
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
Headers: tc.headers,
|
||||
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
|
||||
JSON: json.RawMessage(`{
|
||||
"queryMode": "Logs",
|
||||
"type": "logAction",
|
||||
"subtype": "StartQuery",
|
||||
"region": "default",
|
||||
"queryString": "fields @message"
|
||||
}`),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.called, syncCalled)
|
||||
})
|
||||
}
|
||||
|
||||
executeSyncLogQuery = origExecuteSyncLogQuery
|
||||
})
|
||||
}
|
||||
|
||||
func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *testing.T) {
|
||||
|
@ -18,7 +18,7 @@ const (
|
||||
alertPollPeriod = time.Second
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
for _, q := range req.Queries {
|
||||
@ -45,7 +45,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, logsQuery)
|
||||
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -73,7 +73,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
||||
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
|
||||
if err != nil {
|
Loading…
Reference in New Issue
Block a user