CloudWatch: Add OpenSearch PPL and SQL support in Logs Insights (#97508)

Cloudwatch: OpenSearch PPL and SQL support in Logs Insights

Co-authored-by: Kevin Yu <kevinwcyu@users.noreply.github.com>
Co-authored-by: Nathan Vērzemnieks <njvrzm@users.noreply.github.com>
This commit is contained in:
Ida Štambuk
2024-12-06 15:27:19 +01:00
committed by GitHub
parent 3d856dcb33
commit 2e342e5b1b
60 changed files with 7182 additions and 630 deletions

View File

@@ -16,6 +16,13 @@ const (
CloudWatchQueryModeMetrics CloudWatchQueryMode = "Metrics"
)
// Defines values for LogsQueryLanguage.
const (
LogsQueryLanguageCWLI LogsQueryLanguage = "CWLI"
LogsQueryLanguagePPL LogsQueryLanguage = "PPL"
LogsQueryLanguageSQL LogsQueryLanguage = "SQL"
)
// Defines values for MetricEditorMode.
const (
MetricEditorModeN0 MetricEditorMode = 0
@@ -194,8 +201,9 @@ type CloudWatchLogsQuery struct {
LogGroupNames []string `json:"logGroupNames,omitempty"`
// Log groups to query
LogGroups []LogGroup `json:"logGroups,omitempty"`
QueryMode *CloudWatchQueryMode `json:"queryMode,omitempty"`
LogGroups []LogGroup `json:"logGroups,omitempty"`
QueryLanguage *LogsQueryLanguage `json:"queryLanguage,omitempty"`
QueryMode *CloudWatchQueryMode `json:"queryMode,omitempty"`
// Specify the query flavor
// TODO make this required and give it a default
@@ -325,6 +333,9 @@ type LogGroup struct {
Name string `json:"name"`
}
// LogsQueryLanguage defines model for LogsQueryLanguage.
type LogsQueryLanguage string
// MetricEditorMode defines model for MetricEditorMode.
type MetricEditorMode int

View File

@@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -20,6 +21,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
)
@@ -42,6 +44,45 @@ func (e *AWSError) Error() string {
return fmt.Sprintf("CloudWatch error: %s: %s", e.Code, e.Message)
}
// StartQueryInputWithLanguage copies the StartQueryInput struct from aws-sdk-go@v1.55.5
// (https://github.com/aws/aws-sdk-go/blob/7112c0a0c2d01713a9db2d57f0e5722225baf5b5/service/cloudwatchlogs/api.go#L19541)
// to add support for the new QueryLanguage parameter, which is unlikely to be backported
// since v1 of the aws-sdk-go is in maintenance mode. We've removed the comments for
// clarity.
type StartQueryInputWithLanguage struct {
_ struct{} `type:"structure"`
EndTime *int64 `locationName:"endTime" type:"long" required:"true"`
Limit *int64 `locationName:"limit" min:"1" type:"integer"`
LogGroupIdentifiers []*string `locationName:"logGroupIdentifiers" type:"list"`
LogGroupName *string `locationName:"logGroupName" min:"1" type:"string"`
LogGroupNames []*string `locationName:"logGroupNames" type:"list"`
QueryString *string `locationName:"queryString" type:"string" required:"true"`
// QueryLanguage is the only change here from the original code.
QueryLanguage *string `locationName:"queryLanguage" type:"string"`
StartTime *int64 `locationName:"startTime" type:"long" required:"true"`
}
type WithQueryLanguageFunc func(language *dataquery.LogsQueryLanguage) func(*request.Request)
// WithQueryLanguage assigns the function to a variable in order to mock it in log_actions_test.go
var WithQueryLanguage WithQueryLanguageFunc = withQueryLanguage
func withQueryLanguage(language *dataquery.LogsQueryLanguage) func(request *request.Request) {
return func(request *request.Request) {
sqi := request.Params.(*cloudwatchlogs.StartQueryInput)
request.Params = &StartQueryInputWithLanguage{
EndTime: sqi.EndTime,
Limit: sqi.Limit,
LogGroupIdentifiers: sqi.LogGroupIdentifiers,
LogGroupName: sqi.LogGroupName,
LogGroupNames: sqi.LogGroupNames,
QueryString: sqi.QueryString,
QueryLanguage: (*string)(language),
StartTime: sqi.StartTime,
}
}
}
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
@@ -191,13 +232,21 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
if !startTime.Before(endTime) {
return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false)
}
if logsQuery.QueryLanguage == nil {
cwli := dataquery.LogsQueryLanguageCWLI
logsQuery.QueryLanguage = &cwli
}
finalQueryString := logsQuery.QueryString
// Only for CWLI queries
// The fields @log and @logStream are always included in the results of a user's query
// so that a row's context can be retrieved later if necessary.
// The usage of ltrim around the @log/@logStream fields is a necessary workaround, as without it,
// CloudWatch wouldn't consider a query using a non-alised @log/@logStream valid.
modifiedQueryString := "fields @timestamp,ltrim(@log) as " + logIdentifierInternal + ",ltrim(@logStream) as " +
logStreamIdentifierInternal + "|" + logsQuery.QueryString
if *logsQuery.QueryLanguage == dataquery.LogsQueryLanguageCWLI {
finalQueryString = "fields @timestamp,ltrim(@log) as " + logIdentifierInternal + ",ltrim(@logStream) as " +
logStreamIdentifierInternal + "|" + logsQuery.QueryString
}
startQueryInput := &cloudwatchlogs.StartQueryInput{
StartTime: aws.Int64(startTime.Unix()),
@@ -207,20 +256,23 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
// and also a little bit more but as CW logs accept only seconds as integers there is not much to do about
// that.
EndTime: aws.Int64(int64(math.Ceil(float64(endTime.UnixNano()) / 1e9))),
QueryString: aws.String(modifiedQueryString),
QueryString: aws.String(finalQueryString),
}
if len(logsQuery.LogGroups) > 0 && features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying) {
var logGroupIdentifiers []string
for _, lg := range logsQuery.LogGroups {
arn := lg.Arn
// due to a bug in the startQuery api, we remove * from the arn, otherwise it throws an error
logGroupIdentifiers = append(logGroupIdentifiers, strings.TrimSuffix(arn, "*"))
// log group identifiers can be left out if the query is an SQL query
if *logsQuery.QueryLanguage != dataquery.LogsQueryLanguageSQL {
if len(logsQuery.LogGroups) > 0 && features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying) {
var logGroupIdentifiers []string
for _, lg := range logsQuery.LogGroups {
arn := lg.Arn
// due to a bug in the startQuery api, we remove * from the arn, otherwise it throws an error
logGroupIdentifiers = append(logGroupIdentifiers, strings.TrimSuffix(arn, "*"))
}
startQueryInput.LogGroupIdentifiers = aws.StringSlice(logGroupIdentifiers)
} else {
// even though log group names are being phased out, we still need to support them for backwards compatibility and alert queries
startQueryInput.LogGroupNames = aws.StringSlice(logsQuery.LogGroupNames)
}
startQueryInput.LogGroupIdentifiers = aws.StringSlice(logGroupIdentifiers)
} else {
// even though log group names are being phased out, we still need to support them for backwards compatibility and alert queries
startQueryInput.LogGroupNames = aws.StringSlice(logsQuery.LogGroupNames)
}
if logsQuery.Limit != nil {
@@ -228,7 +280,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
}
e.logger.FromContext(ctx).Debug("Calling startquery with context with input", "input", startQueryInput)
resp, err := logsClient.StartQueryWithContext(ctx, startQueryInput)
resp, err := logsClient.StartQueryWithContext(ctx, startQueryInput, WithQueryLanguage(logsQuery.QueryLanguage))
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" {

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
@@ -17,6 +18,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/mocks"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
@@ -309,6 +311,26 @@ func TestQuery_StartQuery(t *testing.T) {
})
}
type withQueryLanguageMock struct {
capturedLanguage *dataquery.LogsQueryLanguage
mockWithQueryLanguage func(language *dataquery.LogsQueryLanguage) func(request *request.Request)
}
func newWithQueryLanguageMock() *withQueryLanguageMock {
mock := &withQueryLanguageMock{
capturedLanguage: new(dataquery.LogsQueryLanguage),
}
mock.mockWithQueryLanguage = func(language *dataquery.LogsQueryLanguage) func(request *request.Request) {
*mock.capturedLanguage = *language
return func(req *request.Request) {
}
}
return mock
}
func Test_executeStartQuery(t *testing.T) {
origNewCWLogsClient := NewCWLogsClient
t.Cleanup(func() {
@@ -321,40 +343,135 @@ func Test_executeStartQuery(t *testing.T) {
return &cli
}
t.Run("successfully parses information from JSON to StartQueryWithContext", func(t *testing.T) {
cli = fakeCWLogsClient{}
im := datasource.NewInstanceManager(func(ctx context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return DataSource{Settings: models.CloudWatchSettings{}, sessions: &fakeSessionCache{}}, nil
})
executor := newExecutor(im, log.NewNullLogger())
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
Queries: []backend.DataQuery{
{
t.Run("successfully parses information from JSON to StartQueryWithContext for language", func(t *testing.T) {
testCases := map[string]struct {
queries []backend.DataQuery
expectedOutput []*cloudwatchlogs.StartQueryInput
queryLanguage dataquery.LogsQueryLanguage
}{
"not defined": {
queries: []backend.DataQuery{
{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 12,
"queryString":"fields @message",
"logGroupNames":["some name","another name"]
}`),
},
},
expectedOutput: []*cloudwatchlogs.StartQueryInput{{
StartTime: aws.Int64(0),
EndTime: aws.Int64(1),
Limit: aws.Int64(12),
QueryString: aws.String("fields @timestamp,ltrim(@log) as __log__grafana_internal__,ltrim(@logStream) as __logstream__grafana_internal__|fields @message"),
LogGroupNames: []*string{aws.String("some name"), aws.String("another name")},
}},
queryLanguage: dataquery.LogsQueryLanguageCWLI,
},
"CWLI": {
queries: []backend.DataQuery{{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 12,
"queryLanguage": "CWLI",
"queryString":"fields @message",
"logGroupNames":["some name","another name"]
}`),
}},
expectedOutput: []*cloudwatchlogs.StartQueryInput{
{
StartTime: aws.Int64(0),
EndTime: aws.Int64(1),
Limit: aws.Int64(12),
QueryString: aws.String("fields @timestamp,ltrim(@log) as __log__grafana_internal__,ltrim(@logStream) as __logstream__grafana_internal__|fields @message"),
LogGroupNames: []*string{aws.String("some name"), aws.String("another name")},
},
},
queryLanguage: dataquery.LogsQueryLanguageCWLI,
},
})
"PPL": {
queries: []backend.DataQuery{{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 12,
"queryLanguage": "PPL",
"queryString":"source logs | fields @message",
"logGroupNames":["some name","another name"]
}`),
}},
expectedOutput: []*cloudwatchlogs.StartQueryInput{
{
StartTime: aws.Int64(0),
EndTime: aws.Int64(1),
Limit: aws.Int64(12),
QueryString: aws.String("source logs | fields @message"),
LogGroupNames: []*string{aws.String("some name"), aws.String("another name")},
},
},
queryLanguage: dataquery.LogsQueryLanguagePPL,
},
"SQL": {
queries: []backend.DataQuery{
{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 12,
"queryLanguage": "SQL",
"queryString":"SELECT * FROM logs",
"logGroupNames":["some name","another name"]
}`),
},
},
expectedOutput: []*cloudwatchlogs.StartQueryInput{
{
StartTime: aws.Int64(0),
EndTime: aws.Int64(1),
Limit: aws.Int64(12),
QueryString: aws.String("SELECT * FROM logs"),
LogGroupNames: nil,
},
},
queryLanguage: dataquery.LogsQueryLanguageSQL,
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
cli = fakeCWLogsClient{}
im := datasource.NewInstanceManager(func(ctx context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return DataSource{Settings: models.CloudWatchSettings{}, sessions: &fakeSessionCache{}}, nil
})
executor := newExecutor(im, log.NewNullLogger())
assert.NoError(t, err)
assert.Equal(t, []*cloudwatchlogs.StartQueryInput{
{
StartTime: aws.Int64(0),
EndTime: aws.Int64(1),
Limit: aws.Int64(12),
QueryString: aws.String("fields @timestamp,ltrim(@log) as __log__grafana_internal__,ltrim(@logStream) as __logstream__grafana_internal__|fields @message"),
LogGroupNames: []*string{aws.String("some name"), aws.String("another name")},
},
}, cli.calls.startQueryWithContext)
languageMock := newWithQueryLanguageMock()
originalWithQueryLanguage := WithQueryLanguage
WithQueryLanguage = languageMock.mockWithQueryLanguage
defer func() {
WithQueryLanguage = originalWithQueryLanguage
}()
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
Queries: test.queries,
})
assert.NoError(t, err)
assert.Equal(t, test.expectedOutput, cli.calls.startQueryWithContext)
assert.Equal(t, &test.queryLanguage, languageMock.capturedLanguage)
})
}
})
t.Run("does not populate StartQueryInput.limit when no limit provided", func(t *testing.T) {
@@ -400,6 +517,7 @@ func Test_executeStartQuery(t *testing.T) {
"type": "logAction",
"subtype": "StartQuery",
"limit": 12,
"queryLanguage": "CWLI",
"queryString":"fields @message",
"logGroups":[{"arn": "fakeARN"}]
}`),

View File

@@ -54,8 +54,10 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
if _, exists := fieldValues[*resultField.Field]; !exists {
fieldNames = append(fieldNames, *resultField.Field)
// Check if it's a time field
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
// Check if it's a cloudWatchTSFormat field or one of the known timestamp fields:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_AnalyzeLogData-discoverable-fields.html
// which can be in a millisecond format as well as cloudWatchTSFormat string format
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil || isTimestampField(*resultField.Field) {
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
fieldValues[*resultField.Field] = make([]*float64, rowCount)
@@ -67,9 +69,13 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok {
parsedTime, err := time.Parse(cloudWatchTSFormat, *resultField.Value)
if err != nil {
return nil, err
unixTimeMs, err := strconv.ParseInt(*resultField.Value, 10, 64)
if err == nil {
parsedTime = time.Unix(unixTimeMs/1000, (unixTimeMs%1000)*int64(time.Millisecond))
} else {
return nil, err
}
}
timeField[i] = &parsedTime
} else if numericField, ok := fieldValues[*resultField.Field].([]*float64); ok {
parsedFloat, err := strconv.ParseFloat(*resultField.Value, 64)
@@ -313,3 +319,7 @@ func numericFieldToStringField(field *data.Field) (*data.Field, error) {
return newField, nil
}
func isTimestampField(fieldName string) bool {
return fieldName == "@timestamp" || fieldName == "@ingestionTime"
}

View File

@@ -1,6 +1,7 @@
package cloudwatch
import (
"fmt"
"testing"
"time"
@@ -277,6 +278,72 @@ func TestLogsResultsToDataframes_MixedTypes_NumericValuesMixedWithStringFallBack
assert.ElementsMatch(t, expectedDataframe.Fields, dataframes.Fields)
}
func TestLogsResultsToDataframes_With_Millisecond_Timestamps(t *testing.T) {
stringTimeField := "2020-03-02 15:04:05.000"
timestampField := int64(1732749534876)
ingestionTimeField := int64(1732790372916)
dataframes, err := logsResultsToDataframes(&cloudwatchlogs.GetQueryResultsOutput{
Results: [][]*cloudwatchlogs.ResultField{
{
&cloudwatchlogs.ResultField{
Field: aws.String("@timestamp"),
Value: aws.String(fmt.Sprintf("%d", timestampField)),
},
&cloudwatchlogs.ResultField{
Field: aws.String("@ingestionTime"),
Value: aws.String(fmt.Sprintf("%d", ingestionTimeField)),
},
&cloudwatchlogs.ResultField{
Field: aws.String("stringTimeField"),
Value: aws.String(stringTimeField),
},
&cloudwatchlogs.ResultField{
Field: aws.String("message"),
Value: aws.String("log message"),
},
},
},
Status: aws.String("ok"),
})
require.NoError(t, err)
timeStampResult := time.Unix(timestampField/1000, (timestampField%1000)*int64(time.Millisecond))
ingestionTimeResult := time.Unix(ingestionTimeField/1000, (ingestionTimeField%1000)*int64(time.Millisecond))
stringTimeFieldResult, err := time.Parse(cloudWatchTSFormat, stringTimeField)
require.NoError(t, err)
expectedDataframe := &data.Frame{
Name: "CloudWatchLogsResponse",
Fields: []*data.Field{
data.NewField("@timestamp", nil, []*time.Time{
&timeStampResult,
}),
data.NewField("@ingestionTime", nil, []*time.Time{
&ingestionTimeResult,
}),
data.NewField("stringTimeField", nil, []*time.Time{
&stringTimeFieldResult,
}),
data.NewField("message", nil, []*string{
aws.String("log message"),
}),
},
RefID: "",
Meta: &data.FrameMeta{
Custom: map[string]any{
"Status": "ok",
},
},
}
expectedDataframe.Fields[0].SetConfig(&data.FieldConfig{DisplayName: "Time"})
assert.Equal(t, expectedDataframe.Name, dataframes.Name)
assert.Equal(t, expectedDataframe.RefID, dataframes.RefID)
assert.Equal(t, expectedDataframe.Meta, dataframes.Meta)
assert.ElementsMatch(t, expectedDataframe.Fields, dataframes.Fields)
}
func TestGroupKeyGeneration(t *testing.T) {
logField := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-a"),