diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 285b8f2cff3..15b8d8d4bfd 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -145,6 +145,7 @@ Experimental features might be changed or removed without prior notice. | `kubernetesPlaylists` | Use the kubernetes API in the frontend for playlists, and route /api/playlist requests to k8s | | `navAdminSubsections` | Splits the administration section of the nav tree into subsections | | `recoveryThreshold` | Enables feature recovery threshold (aka hysteresis) for threshold server-side expression | +| `lokiStructuredMetadata` | Enables the loki data source to request structured metadata from the Loki server | | `teamHttpHeaders` | Enables datasources to apply team headers to the client requests | | `awsDatasourcesNewFormStyling` | Applies new form styling for configuration and query editors in AWS plugins | | `cachingOptimizeSerializationMemoryUsage` | If enabled, the caching backend gradually serializes query responses for the cache, comparing against the configured `[caching]max_value_mb` value as it goes. This can can help prevent Grafana from running out of memory while attempting to cache very large query responses. | diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 8f75b471e95..31e2e691b05 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -138,6 +138,7 @@ export interface FeatureToggles { cloudWatchBatchQueries?: boolean; navAdminSubsections?: boolean; recoveryThreshold?: boolean; + lokiStructuredMetadata?: boolean; teamHttpHeaders?: boolean; awsDatasourcesNewFormStyling?: boolean; cachingOptimizeSerializationMemoryUsage?: boolean; diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index 779d7fe14e9..5cbddeca68c 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -893,6 +893,13 @@ var ( Owner: grafanaAlertingSquad, RequiresRestart: true, }, + { + Name: "lokiStructuredMetadata", + Description: "Enables the loki data source to request structured metadata from the Loki server", + Stage: FeatureStageExperimental, + FrontendOnly: false, + Owner: grafanaObservabilityLogsSquad, + }, { Name: "teamHttpHeaders", Description: "Enables datasources to apply team headers to the client requests", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 826dd69f5e3..512a32f9098 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -119,6 +119,7 @@ kubernetesPlaylists,experimental,@grafana/grafana-app-platform-squad,false,false cloudWatchBatchQueries,preview,@grafana/aws-datasources,false,false,false,false navAdminSubsections,experimental,@grafana/grafana-frontend-platform,false,false,false,false recoveryThreshold,experimental,@grafana/alerting-squad,false,false,true,false +lokiStructuredMetadata,experimental,@grafana/observability-logs,false,false,false,false teamHttpHeaders,experimental,@grafana/identity-access-team,false,false,false,false awsDatasourcesNewFormStyling,experimental,@grafana/aws-datasources,false,false,false,true cachingOptimizeSerializationMemoryUsage,experimental,@grafana/grafana-operator-experience-squad,false,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index e6234c6d826..1c4f47cc32b 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -487,6 +487,10 @@ const ( // Enables feature recovery threshold (aka hysteresis) for threshold server-side expression FlagRecoveryThreshold = "recoveryThreshold" + // FlagLokiStructuredMetadata + // Enables the loki data source to request structured metadata from the Loki server + FlagLokiStructuredMetadata = "lokiStructuredMetadata" + // FlagTeamHttpHeaders // Enables datasources to apply team headers to the client requests FlagTeamHttpHeaders = "teamHttpHeaders" diff --git a/pkg/tsdb/loki/api.go b/pkg/tsdb/loki/api.go index b1a9dfd404c..434d7a76503 100644 --- a/pkg/tsdb/loki/api.go +++ b/pkg/tsdb/loki/api.go @@ -26,10 +26,11 @@ import ( ) type LokiAPI struct { - client *http.Client - url string - log log.Logger - tracer tracing.Tracer + client *http.Client + url string + log log.Logger + tracer tracing.Tracer + requestStructuredMetadata bool } type RawLokiResponse struct { @@ -38,11 +39,11 @@ type RawLokiResponse struct { Encoding string } -func newLokiAPI(client *http.Client, url string, log log.Logger, tracer tracing.Tracer) *LokiAPI { - return &LokiAPI{client: client, url: url, log: log, tracer: tracer} +func newLokiAPI(client *http.Client, url string, log log.Logger, tracer tracing.Tracer, requestStructuredMetadata bool) *LokiAPI { + return &LokiAPI{client: client, url: url, log: log, tracer: tracer, requestStructuredMetadata: requestStructuredMetadata} } -func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) { +func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, categorizeLabels bool) (*http.Request, error) { qs := url.Values{} qs.Set("query", query.Expr) @@ -102,6 +103,10 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h } } + if categorizeLabels { + req.Header.Set("X-Loki-Response-Encoding-Flags", "categorize-labels") + } + return req, nil } @@ -156,7 +161,7 @@ func readLokiError(body io.ReadCloser) error { } func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) (data.Frames, error) { - req, err := makeDataRequest(ctx, api.url, query) + req, err := makeDataRequest(ctx, api.url, query, api.requestStructuredMetadata) if err != nil { return nil, err } diff --git a/pkg/tsdb/loki/api_mock.go b/pkg/tsdb/loki/api_mock.go index bd32f443f24..773057724a4 100644 --- a/pkg/tsdb/loki/api_mock.go +++ b/pkg/tsdb/loki/api_mock.go @@ -56,16 +56,16 @@ func (mockedRT *mockedCompressedRoundTripper) RoundTrip(req *http.Request) (*htt }, nil } -func makeMockedAPI(statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI { - return makeMockedAPIWithUrl("http://localhost:9999", statusCode, contentType, responseBytes, requestCallback) +func makeMockedAPI(statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback, structuredMetadata bool) *LokiAPI { + return makeMockedAPIWithUrl("http://localhost:9999", statusCode, contentType, responseBytes, requestCallback, structuredMetadata) } -func makeMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI { +func makeMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback, structuredMetadata bool) *LokiAPI { client := http.Client{ Transport: &mockedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback}, } - return newLokiAPI(&client, url, log.New("test"), tracing.InitializeTracerForTest()) + return newLokiAPI(&client, url, log.New("test"), tracing.InitializeTracerForTest(), structuredMetadata) } func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI { @@ -73,5 +73,5 @@ func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType stri Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback}, } - return newLokiAPI(&client, url, log.New("test"), tracing.InitializeTracerForTest()) + return newLokiAPI(&client, url, log.New("test"), tracing.InitializeTracerForTest(), false) } diff --git a/pkg/tsdb/loki/api_test.go b/pkg/tsdb/loki/api_test.go index 6adc839f8ad..cbe964dbbe9 100644 --- a/pkg/tsdb/loki/api_test.go +++ b/pkg/tsdb/loki/api_test.go @@ -26,7 +26,7 @@ func TestApiLogVolume(t *testing.T) { api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { called = true require.Equal(t, "Source=logvolhist", req.Header.Get("X-Query-Tags")) - }) + }, false) _, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange}, ResponseOpts{}) require.NoError(t, err) @@ -38,7 +38,7 @@ func TestApiLogVolume(t *testing.T) { api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { called = true require.Equal(t, "Source=logsample", req.Header.Get("X-Query-Tags")) - }) + }, false) _, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{}) require.NoError(t, err) @@ -50,7 +50,7 @@ func TestApiLogVolume(t *testing.T) { api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { called = true require.Equal(t, "Source=datasample", req.Header.Get("X-Query-Tags")) - }) + }, false) _, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryDataSample, QueryType: QueryTypeRange}, ResponseOpts{}) require.NoError(t, err) @@ -62,12 +62,24 @@ func TestApiLogVolume(t *testing.T) { api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { called = true require.Equal(t, "", req.Header.Get("X-Query-Tags")) - }) + }, false) _, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryNone, QueryType: QueryTypeRange}, ResponseOpts{}) require.NoError(t, err) require.True(t, called) }) + + t.Run("with `structuredMetadata` should set correct http header", func(t *testing.T) { + called := false + api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { + called = true + require.Equal(t, "categorize-labels", req.Header.Get("X-Loki-Response-Encoding-Flags")) + }, true) + + _, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange}, ResponseOpts{}) + require.NoError(t, err) + require.True(t, called) + }) } func TestApiUrlHandling(t *testing.T) { @@ -127,7 +139,7 @@ func TestApiUrlHandling(t *testing.T) { wantedPrefix := test.rangeQueryPrefix failMessage := fmt.Sprintf(`wanted prefix: [%s], got string [%s]`, wantedPrefix, urlString) require.True(t, strings.HasPrefix(urlString, wantedPrefix), failMessage) - }) + }, false) query := lokiQuery{ QueryType: QueryTypeRange, @@ -148,7 +160,7 @@ func TestApiUrlHandling(t *testing.T) { wantedPrefix := test.instantQueryPrefix failMessage := fmt.Sprintf(`wanted prefix: [%s], got string [%s]`, wantedPrefix, urlString) require.True(t, strings.HasPrefix(urlString, wantedPrefix), failMessage) - }) + }, false) query := lokiQuery{ QueryType: QueryTypeInstant, @@ -166,7 +178,7 @@ func TestApiUrlHandling(t *testing.T) { api := makeMockedAPIWithUrl(test.dsUrl, 200, "application/json", response, func(req *http.Request) { called = true require.Equal(t, test.metaUrl, req.URL.String()) - }) + }, false) _, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2") require.NoError(t, err) diff --git a/pkg/tsdb/loki/frame.go b/pkg/tsdb/loki/frame.go index 132858ee98c..8655b746aca 100644 --- a/pkg/tsdb/loki/frame.go +++ b/pkg/tsdb/loki/frame.go @@ -93,14 +93,26 @@ func adjustLogsFrame(frame *data.Frame, query *lokiQuery, dataplane bool) error func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error { // we check if the fields are of correct type and length fields := frame.Fields - if len(fields) != 4 { - return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields)) + if len(fields) != 4 && len(fields) != 5 { + return fmt.Errorf("invalid field length in logs frame. expected 4 or 5, got %d", len(fields)) } labelsField := fields[0] timeField := fields[1] lineField := fields[2] stringTimeField := fields[3] + if len(fields) == 5 { + labelTypesField := fields[4] + if labelTypesField.Type() != data.FieldTypeJSON { + return fmt.Errorf("invalid field types in logs frame. expected json, got %s", labelTypesField.Type()) + } + labelTypesField.Name = "labelTypes" + labelTypesField.Config = &data.FieldConfig{ + Custom: map[string]interface{}{ + "hidden": true, + }, + } + } if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) { return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type()) @@ -149,14 +161,27 @@ func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error { func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error { // we check if the fields are of correct type and length fields := frame.Fields - if len(fields) != 4 { - return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields)) + if len(fields) != 4 && len(fields) != 5 { + return fmt.Errorf("invalid field length in logs frame. expected 4 or 5, got %d", len(fields)) } labelsField := fields[0] timeField := fields[1] lineField := fields[2] stringTimeField := fields[3] + var labelTypesField *data.Field + if len(fields) == 5 { + labelTypesField = fields[4] + if labelTypesField.Type() != data.FieldTypeJSON { + return fmt.Errorf("invalid field types in logs frame. expected json, got %s", labelTypesField.Type()) + } + labelTypesField.Name = "labelTypes" + labelTypesField.Config = &data.FieldConfig{ + Custom: map[string]interface{}{ + "hidden": true, + }, + } + } if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) { return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type()) @@ -190,7 +215,12 @@ func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error { if err != nil { return err } - frame.Fields = data.Fields{labelsField, timeField, lineField, idField} + + if labelTypesField != nil { + frame.Fields = data.Fields{labelsField, timeField, lineField, idField, labelTypesField} + } else { + frame.Fields = data.Fields{labelsField, timeField, lineField, idField} + } return nil } diff --git a/pkg/tsdb/loki/framing_test.go b/pkg/tsdb/loki/framing_test.go index 056b0e8ac92..5480fe3abcc 100644 --- a/pkg/tsdb/loki/framing_test.go +++ b/pkg/tsdb/loki/framing_test.go @@ -51,6 +51,8 @@ func TestSuccessResponse(t *testing.T) { {name: "parse a streams response with parse errors", filepath: "streams_parse_errors", query: streamsQuery}, {name: "parse an empty response", filepath: "empty", query: matrixQuery}, + + {name: "parse structured metadata", filepath: "streams_structured_metadata", query: streamsQuery}, } runTest := func(folder string, path string, query lokiQuery, responseOpts ResponseOpts) { @@ -61,14 +63,14 @@ func TestSuccessResponse(t *testing.T) { bytes, err := os.ReadFile(responseFileName) require.NoError(t, err) - frames, err := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &query, responseOpts, log.New("test")) + frames, err := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil, false), &query, responseOpts, log.New("test")) require.NoError(t, err) dr := &backend.DataResponse{ Frames: frames, Error: err, } - experimental.CheckGoldenJSONResponse(t, folder, goldenFileName, dr, true) + experimental.CheckGoldenJSONResponse(t, folder, goldenFileName, dr, false) } for _, test := range tt { @@ -126,7 +128,7 @@ func TestErrorResponse(t *testing.T) { for _, test := range tt { t.Run(test.name, func(t *testing.T) { - frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}, ResponseOpts{}, log.New("test")) + frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil, false), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}, ResponseOpts{}, log.New("test")) require.Len(t, frames, 0) require.Error(t, err) diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index 138da10fdb8..6a6e4f59128 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -140,7 +140,7 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender )) defer span.End() - api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) + api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, false) rawLokiResponse, err := api.RawQuery(ctx, lokiURL) if err != nil { @@ -177,13 +177,13 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) logsDataplane: s.features.IsEnabled(ctx, featuremgmt.FlagLokiLogsDataplane), } - return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, s.features.IsEnabled(ctx, featuremgmt.FlagLokiRunQueriesInParallel)) + return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, s.features.IsEnabled(ctx, featuremgmt.FlagLokiRunQueriesInParallel), s.features.IsEnabled(ctx, featuremgmt.FlagLokiStructuredMetadata)) } -func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool) (*backend.QueryDataResponse, error) { +func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool, requestStructuredMetadata bool) (*backend.QueryDataResponse, error) { result := backend.NewQueryDataResponse() - api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) + api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, requestStructuredMetadata) start := time.Now() queries, err := parseQuery(req) diff --git a/pkg/tsdb/loki/loki_bench_test.go b/pkg/tsdb/loki/loki_bench_test.go index a12579623c0..bebc4ada270 100644 --- a/pkg/tsdb/loki/loki_bench_test.go +++ b/pkg/tsdb/loki/loki_bench_test.go @@ -19,7 +19,7 @@ func BenchmarkMatrixJson(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - _, _ = runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &lokiQuery{}, ResponseOpts{}, log.New("test")) + _, _ = runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil, false), &lokiQuery{}, ResponseOpts{}, log.New("test")) } } diff --git a/pkg/tsdb/loki/testdata/streams_structured_metadata.golden.jsonc b/pkg/tsdb/loki/testdata/streams_structured_metadata.golden.jsonc new file mode 100644 index 00000000000..9ba619aeb9e --- /dev/null +++ b/pkg/tsdb/loki/testdata/streams_structured_metadata.golden.jsonc @@ -0,0 +1,380 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "typeVersion": [ +// 0, +// 0 +// ], +// "custom": { +// "frameType": "LabeledTimeValues" +// }, +// "stats": [ +// { +// "displayName": "Summary: bytes processed per second", +// "unit": "Bps", +// "value": 3507022 +// }, +// { +// "displayName": "Summary: lines processed per second", +// "value": 24818 +// }, +// { +// "displayName": "Summary: total bytes processed", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Summary: total lines processed", +// "value": 55 +// }, +// { +// "displayName": "Summary: exec time", +// "unit": "s", +// "value": 0.002216125 +// }, +// { +// "displayName": "Store: total chunks ref", +// "value": 2 +// }, +// { +// "displayName": "Store: total chunks downloaded", +// "value": 3 +// }, +// { +// "displayName": "Store: chunks download time", +// "unit": "s", +// "value": 0.000390958 +// }, +// { +// "displayName": "Store: head chunk bytes", +// "unit": "decbytes", +// "value": 4 +// }, +// { +// "displayName": "Store: head chunk lines", +// "value": 5 +// }, +// { +// "displayName": "Store: decompressed bytes", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Store: decompressed lines", +// "value": 55 +// }, +// { +// "displayName": "Store: compressed bytes", +// "unit": "decbytes", +// "value": 31432 +// }, +// { +// "displayName": "Store: total duplicates", +// "value": 6 +// }, +// { +// "displayName": "Ingester: total reached", +// "value": 7 +// }, +// { +// "displayName": "Ingester: total chunks matched", +// "value": 8 +// }, +// { +// "displayName": "Ingester: total batches", +// "value": 9 +// }, +// { +// "displayName": "Ingester: total lines sent", +// "value": 10 +// }, +// { +// "displayName": "Ingester: head chunk bytes", +// "unit": "decbytes", +// "value": 11 +// }, +// { +// "displayName": "Ingester: head chunk lines", +// "value": 12 +// }, +// { +// "displayName": "Ingester: decompressed bytes", +// "unit": "decbytes", +// "value": 13 +// }, +// { +// "displayName": "Ingester: decompressed lines", +// "value": 14 +// }, +// { +// "displayName": "Ingester: compressed bytes", +// "unit": "decbytes", +// "value": 15 +// }, +// { +// "displayName": "Ingester: total duplicates", +// "value": 16 +// } +// ], +// "executedQueryString": "Expr: query1" +// } +// Name: +// Dimensions: 6 Fields by 6 Rows +// +-------------------------+-----------------------------------------+------------------+---------------------+-------------------------+--------------------------------+ +// | Name: labels | Name: Time | Name: Line | Name: tsNs | Name: labelTypes | Name: id | +// | Labels: | Labels: | Labels: | Labels: | Labels: | Labels: | +// | Type: []json.RawMessage | Type: []time.Time | Type: []string | Type: []string | Type: []json.RawMessage | Type: []string | +// +-------------------------+-----------------------------------------+------------------+---------------------+-------------------------+--------------------------------+ +// | {} | 2022-02-16 16:50:44.81075712 +0000 UTC | log line error 1 | 1645030244810757120 | {} | 1645030244810757120_7bc6af55 | +// | {} | 2022-02-16 16:50:47.02773504 +0000 UTC | log line info 1 | 1645030247027735040 | {} | 1645030247027735040_2c9e69b9 | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968 | {} | 1645030246277587968_b7784d5c | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968 | {} | 1645030246277587968_b7784d5c_1 | +// | {} | 2022-02-16 16:50:45.539423744 +0000 UTC | log line info 3 | 1645030245539423744 | {} | 1645030245539423744_4aee7aab | +// | {} | 2022-02-16 16:50:44.091700992 +0000 UTC | log line info 4 | 1645030244091700992 | {} | 1645030244091700992_df059f36 | +// +-------------------------+-----------------------------------------+------------------+---------------------+-------------------------+--------------------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "meta": { + "typeVersion": [ + 0, + 0 + ], + "custom": { + "frameType": "LabeledTimeValues" + }, + "stats": [ + { + "displayName": "Summary: bytes processed per second", + "unit": "Bps", + "value": 3507022 + }, + { + "displayName": "Summary: lines processed per second", + "value": 24818 + }, + { + "displayName": "Summary: total bytes processed", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Summary: total lines processed", + "value": 55 + }, + { + "displayName": "Summary: exec time", + "unit": "s", + "value": 0.002216125 + }, + { + "displayName": "Store: total chunks ref", + "value": 2 + }, + { + "displayName": "Store: total chunks downloaded", + "value": 3 + }, + { + "displayName": "Store: chunks download time", + "unit": "s", + "value": 0.000390958 + }, + { + "displayName": "Store: head chunk bytes", + "unit": "decbytes", + "value": 4 + }, + { + "displayName": "Store: head chunk lines", + "value": 5 + }, + { + "displayName": "Store: decompressed bytes", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Store: decompressed lines", + "value": 55 + }, + { + "displayName": "Store: compressed bytes", + "unit": "decbytes", + "value": 31432 + }, + { + "displayName": "Store: total duplicates", + "value": 6 + }, + { + "displayName": "Ingester: total reached", + "value": 7 + }, + { + "displayName": "Ingester: total chunks matched", + "value": 8 + }, + { + "displayName": "Ingester: total batches", + "value": 9 + }, + { + "displayName": "Ingester: total lines sent", + "value": 10 + }, + { + "displayName": "Ingester: head chunk bytes", + "unit": "decbytes", + "value": 11 + }, + { + "displayName": "Ingester: head chunk lines", + "value": 12 + }, + { + "displayName": "Ingester: decompressed bytes", + "unit": "decbytes", + "value": 13 + }, + { + "displayName": "Ingester: decompressed lines", + "value": 14 + }, + { + "displayName": "Ingester: compressed bytes", + "unit": "decbytes", + "value": 15 + }, + { + "displayName": "Ingester: total duplicates", + "value": 16 + } + ], + "executedQueryString": "Expr: query1" + }, + "fields": [ + { + "name": "labels", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + }, + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Line", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "tsNs", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "labelTypes", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + }, + "config": { + "custom": { + "hidden": true + } + } + }, + { + "name": "id", + "type": "string", + "typeInfo": { + "frame": "string" + } + } + ] + }, + "data": { + "values": [ + [ + {}, + {}, + {}, + {}, + {}, + {} + ], + [ + 1645030244810, + 1645030247027, + 1645030246277, + 1645030246277, + 1645030245539, + 1645030244091 + ], + [ + "log line error 1", + "log line info 1", + "log line info 2", + "log line info 2", + "log line info 3", + "log line info 4" + ], + [ + "1645030244810757120", + "1645030247027735040", + "1645030246277587968", + "1645030246277587968", + "1645030245539423744", + "1645030244091700992" + ], + [ + {}, + {}, + {}, + {}, + {}, + {} + ], + [ + "1645030244810757120_7bc6af55", + "1645030247027735040_2c9e69b9", + "1645030246277587968_b7784d5c", + "1645030246277587968_b7784d5c_1", + "1645030245539423744_4aee7aab", + "1645030244091700992_df059f36" + ] + ], + "nanos": [ + null, + [ + 757120, + 735040, + 587968, + 587968, + 423744, + 700992 + ], + null, + null, + null, + null + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/loki/testdata/streams_structured_metadata.json b/pkg/tsdb/loki/testdata/streams_structured_metadata.json new file mode 100644 index 00000000000..760e15ca712 --- /dev/null +++ b/pkg/tsdb/loki/testdata/streams_structured_metadata.json @@ -0,0 +1,89 @@ +{ + "status": "success", + "data": { + "encodingFlags": [ + "categorize-labels" + ], + "resultType": "streams", + "result": [ + { + "stream": { + "code": "one\",", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030244810757120", + "log line error 1", + {} + ] + ] + }, + { + "stream": { + "code": "\",two", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030247027735040", + "log line info 1", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030245539423744", + "log line info 3", + {} + ], + [ + "1645030244091700992", + "log line info 4", + {} + ] + ] + } + ], + "stats": { + "summary": { + "bytesProcessedPerSecond": 3507022, + "linesProcessedPerSecond": 24818, + "totalBytesProcessed": 7772, + "totalLinesProcessed": 55, + "execTime": 0.002216125 + }, + "store": { + "totalChunksRef": 2, + "totalChunksDownloaded": 3, + "chunksDownloadTime": 0.000390958, + "headChunkBytes": 4, + "headChunkLines": 5, + "decompressedBytes": 7772, + "decompressedLines": 55, + "compressedBytes": 31432, + "totalDuplicates": 6 + }, + "ingester": { + "totalReached": 7, + "totalChunksMatched": 8, + "totalBatches": 9, + "totalLinesSent": 10, + "headChunkBytes": 11, + "headChunkLines": 12, + "decompressedBytes": 13, + "decompressedLines": 14, + "compressedBytes": 15, + "totalDuplicates": 16 + } + } + } +} diff --git a/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.golden.jsonc b/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.golden.jsonc new file mode 100644 index 00000000000..43884ad2c87 --- /dev/null +++ b/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.golden.jsonc @@ -0,0 +1,360 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "type": "log-lines", +// "typeVersion": [ +// 0, +// 0 +// ], +// "stats": [ +// { +// "displayName": "Summary: bytes processed per second", +// "unit": "Bps", +// "value": 3507022 +// }, +// { +// "displayName": "Summary: lines processed per second", +// "value": 24818 +// }, +// { +// "displayName": "Summary: total bytes processed", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Summary: total lines processed", +// "value": 55 +// }, +// { +// "displayName": "Summary: exec time", +// "unit": "s", +// "value": 0.002216125 +// }, +// { +// "displayName": "Store: total chunks ref", +// "value": 2 +// }, +// { +// "displayName": "Store: total chunks downloaded", +// "value": 3 +// }, +// { +// "displayName": "Store: chunks download time", +// "unit": "s", +// "value": 0.000390958 +// }, +// { +// "displayName": "Store: head chunk bytes", +// "unit": "decbytes", +// "value": 4 +// }, +// { +// "displayName": "Store: head chunk lines", +// "value": 5 +// }, +// { +// "displayName": "Store: decompressed bytes", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Store: decompressed lines", +// "value": 55 +// }, +// { +// "displayName": "Store: compressed bytes", +// "unit": "decbytes", +// "value": 31432 +// }, +// { +// "displayName": "Store: total duplicates", +// "value": 6 +// }, +// { +// "displayName": "Ingester: total reached", +// "value": 7 +// }, +// { +// "displayName": "Ingester: total chunks matched", +// "value": 8 +// }, +// { +// "displayName": "Ingester: total batches", +// "value": 9 +// }, +// { +// "displayName": "Ingester: total lines sent", +// "value": 10 +// }, +// { +// "displayName": "Ingester: head chunk bytes", +// "unit": "decbytes", +// "value": 11 +// }, +// { +// "displayName": "Ingester: head chunk lines", +// "value": 12 +// }, +// { +// "displayName": "Ingester: decompressed bytes", +// "unit": "decbytes", +// "value": 13 +// }, +// { +// "displayName": "Ingester: decompressed lines", +// "value": 14 +// }, +// { +// "displayName": "Ingester: compressed bytes", +// "unit": "decbytes", +// "value": 15 +// }, +// { +// "displayName": "Ingester: total duplicates", +// "value": 16 +// } +// ], +// "executedQueryString": "Expr: query1" +// } +// Name: +// Dimensions: 5 Fields by 6 Rows +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// | Name: labels | Name: timestamp | Name: body | Name: id | Name: labelTypes | +// | Labels: | Labels: | Labels: | Labels: | Labels: | +// | Type: []json.RawMessage | Type: []time.Time | Type: []string | Type: []string | Type: []json.RawMessage | +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// | {} | 2022-02-16 16:50:44.81075712 +0000 UTC | log line error 1 | 1645030244810757120_7bc6af55 | {} | +// | {} | 2022-02-16 16:50:47.02773504 +0000 UTC | log line info 1 | 1645030247027735040_2c9e69b9 | {} | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968_b7784d5c | {} | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968_b7784d5c_1 | {} | +// | {} | 2022-02-16 16:50:45.539423744 +0000 UTC | log line info 3 | 1645030245539423744_4aee7aab | {} | +// | {} | 2022-02-16 16:50:44.091700992 +0000 UTC | log line info 4 | 1645030244091700992_df059f36 | {} | +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "meta": { + "type": "log-lines", + "typeVersion": [ + 0, + 0 + ], + "stats": [ + { + "displayName": "Summary: bytes processed per second", + "unit": "Bps", + "value": 3507022 + }, + { + "displayName": "Summary: lines processed per second", + "value": 24818 + }, + { + "displayName": "Summary: total bytes processed", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Summary: total lines processed", + "value": 55 + }, + { + "displayName": "Summary: exec time", + "unit": "s", + "value": 0.002216125 + }, + { + "displayName": "Store: total chunks ref", + "value": 2 + }, + { + "displayName": "Store: total chunks downloaded", + "value": 3 + }, + { + "displayName": "Store: chunks download time", + "unit": "s", + "value": 0.000390958 + }, + { + "displayName": "Store: head chunk bytes", + "unit": "decbytes", + "value": 4 + }, + { + "displayName": "Store: head chunk lines", + "value": 5 + }, + { + "displayName": "Store: decompressed bytes", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Store: decompressed lines", + "value": 55 + }, + { + "displayName": "Store: compressed bytes", + "unit": "decbytes", + "value": 31432 + }, + { + "displayName": "Store: total duplicates", + "value": 6 + }, + { + "displayName": "Ingester: total reached", + "value": 7 + }, + { + "displayName": "Ingester: total chunks matched", + "value": 8 + }, + { + "displayName": "Ingester: total batches", + "value": 9 + }, + { + "displayName": "Ingester: total lines sent", + "value": 10 + }, + { + "displayName": "Ingester: head chunk bytes", + "unit": "decbytes", + "value": 11 + }, + { + "displayName": "Ingester: head chunk lines", + "value": 12 + }, + { + "displayName": "Ingester: decompressed bytes", + "unit": "decbytes", + "value": 13 + }, + { + "displayName": "Ingester: decompressed lines", + "value": 14 + }, + { + "displayName": "Ingester: compressed bytes", + "unit": "decbytes", + "value": 15 + }, + { + "displayName": "Ingester: total duplicates", + "value": 16 + } + ], + "executedQueryString": "Expr: query1" + }, + "fields": [ + { + "name": "labels", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + }, + { + "name": "timestamp", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "body", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "id", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "labelTypes", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + }, + "config": { + "custom": { + "hidden": true + } + } + } + ] + }, + "data": { + "values": [ + [ + {}, + {}, + {}, + {}, + {}, + {} + ], + [ + 1645030244810, + 1645030247027, + 1645030246277, + 1645030246277, + 1645030245539, + 1645030244091 + ], + [ + "log line error 1", + "log line info 1", + "log line info 2", + "log line info 2", + "log line info 3", + "log line info 4" + ], + [ + "1645030244810757120_7bc6af55", + "1645030247027735040_2c9e69b9", + "1645030246277587968_b7784d5c", + "1645030246277587968_b7784d5c_1", + "1645030245539423744_4aee7aab", + "1645030244091700992_df059f36" + ], + [ + {}, + {}, + {}, + {}, + {}, + {} + ] + ], + "nanos": [ + null, + [ + 757120, + 735040, + 587968, + 587968, + 423744, + 700992 + ], + null, + null, + null + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.json b/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.json new file mode 100644 index 00000000000..760e15ca712 --- /dev/null +++ b/pkg/tsdb/loki/testdata_dataplane/streams_structured_metadata.json @@ -0,0 +1,89 @@ +{ + "status": "success", + "data": { + "encodingFlags": [ + "categorize-labels" + ], + "resultType": "streams", + "result": [ + { + "stream": { + "code": "one\",", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030244810757120", + "log line error 1", + {} + ] + ] + }, + { + "stream": { + "code": "\",two", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030247027735040", + "log line info 1", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030245539423744", + "log line info 3", + {} + ], + [ + "1645030244091700992", + "log line info 4", + {} + ] + ] + } + ], + "stats": { + "summary": { + "bytesProcessedPerSecond": 3507022, + "linesProcessedPerSecond": 24818, + "totalBytesProcessed": 7772, + "totalLinesProcessed": 55, + "execTime": 0.002216125 + }, + "store": { + "totalChunksRef": 2, + "totalChunksDownloaded": 3, + "chunksDownloadTime": 0.000390958, + "headChunkBytes": 4, + "headChunkLines": 5, + "decompressedBytes": 7772, + "decompressedLines": 55, + "compressedBytes": 31432, + "totalDuplicates": 6 + }, + "ingester": { + "totalReached": 7, + "totalChunksMatched": 8, + "totalBatches": 9, + "totalLinesSent": 10, + "headChunkBytes": 11, + "headChunkLines": 12, + "decompressedBytes": 13, + "decompressedLines": 14, + "compressedBytes": 15, + "totalDuplicates": 16 + } + } + } +} diff --git a/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.golden.jsonc b/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.golden.jsonc new file mode 100644 index 00000000000..43884ad2c87 --- /dev/null +++ b/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.golden.jsonc @@ -0,0 +1,360 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "type": "log-lines", +// "typeVersion": [ +// 0, +// 0 +// ], +// "stats": [ +// { +// "displayName": "Summary: bytes processed per second", +// "unit": "Bps", +// "value": 3507022 +// }, +// { +// "displayName": "Summary: lines processed per second", +// "value": 24818 +// }, +// { +// "displayName": "Summary: total bytes processed", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Summary: total lines processed", +// "value": 55 +// }, +// { +// "displayName": "Summary: exec time", +// "unit": "s", +// "value": 0.002216125 +// }, +// { +// "displayName": "Store: total chunks ref", +// "value": 2 +// }, +// { +// "displayName": "Store: total chunks downloaded", +// "value": 3 +// }, +// { +// "displayName": "Store: chunks download time", +// "unit": "s", +// "value": 0.000390958 +// }, +// { +// "displayName": "Store: head chunk bytes", +// "unit": "decbytes", +// "value": 4 +// }, +// { +// "displayName": "Store: head chunk lines", +// "value": 5 +// }, +// { +// "displayName": "Store: decompressed bytes", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Store: decompressed lines", +// "value": 55 +// }, +// { +// "displayName": "Store: compressed bytes", +// "unit": "decbytes", +// "value": 31432 +// }, +// { +// "displayName": "Store: total duplicates", +// "value": 6 +// }, +// { +// "displayName": "Ingester: total reached", +// "value": 7 +// }, +// { +// "displayName": "Ingester: total chunks matched", +// "value": 8 +// }, +// { +// "displayName": "Ingester: total batches", +// "value": 9 +// }, +// { +// "displayName": "Ingester: total lines sent", +// "value": 10 +// }, +// { +// "displayName": "Ingester: head chunk bytes", +// "unit": "decbytes", +// "value": 11 +// }, +// { +// "displayName": "Ingester: head chunk lines", +// "value": 12 +// }, +// { +// "displayName": "Ingester: decompressed bytes", +// "unit": "decbytes", +// "value": 13 +// }, +// { +// "displayName": "Ingester: decompressed lines", +// "value": 14 +// }, +// { +// "displayName": "Ingester: compressed bytes", +// "unit": "decbytes", +// "value": 15 +// }, +// { +// "displayName": "Ingester: total duplicates", +// "value": 16 +// } +// ], +// "executedQueryString": "Expr: query1" +// } +// Name: +// Dimensions: 5 Fields by 6 Rows +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// | Name: labels | Name: timestamp | Name: body | Name: id | Name: labelTypes | +// | Labels: | Labels: | Labels: | Labels: | Labels: | +// | Type: []json.RawMessage | Type: []time.Time | Type: []string | Type: []string | Type: []json.RawMessage | +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// | {} | 2022-02-16 16:50:44.81075712 +0000 UTC | log line error 1 | 1645030244810757120_7bc6af55 | {} | +// | {} | 2022-02-16 16:50:47.02773504 +0000 UTC | log line info 1 | 1645030247027735040_2c9e69b9 | {} | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968_b7784d5c | {} | +// | {} | 2022-02-16 16:50:46.277587968 +0000 UTC | log line info 2 | 1645030246277587968_b7784d5c_1 | {} | +// | {} | 2022-02-16 16:50:45.539423744 +0000 UTC | log line info 3 | 1645030245539423744_4aee7aab | {} | +// | {} | 2022-02-16 16:50:44.091700992 +0000 UTC | log line info 4 | 1645030244091700992_df059f36 | {} | +// +-------------------------+-----------------------------------------+------------------+--------------------------------+-------------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "meta": { + "type": "log-lines", + "typeVersion": [ + 0, + 0 + ], + "stats": [ + { + "displayName": "Summary: bytes processed per second", + "unit": "Bps", + "value": 3507022 + }, + { + "displayName": "Summary: lines processed per second", + "value": 24818 + }, + { + "displayName": "Summary: total bytes processed", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Summary: total lines processed", + "value": 55 + }, + { + "displayName": "Summary: exec time", + "unit": "s", + "value": 0.002216125 + }, + { + "displayName": "Store: total chunks ref", + "value": 2 + }, + { + "displayName": "Store: total chunks downloaded", + "value": 3 + }, + { + "displayName": "Store: chunks download time", + "unit": "s", + "value": 0.000390958 + }, + { + "displayName": "Store: head chunk bytes", + "unit": "decbytes", + "value": 4 + }, + { + "displayName": "Store: head chunk lines", + "value": 5 + }, + { + "displayName": "Store: decompressed bytes", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Store: decompressed lines", + "value": 55 + }, + { + "displayName": "Store: compressed bytes", + "unit": "decbytes", + "value": 31432 + }, + { + "displayName": "Store: total duplicates", + "value": 6 + }, + { + "displayName": "Ingester: total reached", + "value": 7 + }, + { + "displayName": "Ingester: total chunks matched", + "value": 8 + }, + { + "displayName": "Ingester: total batches", + "value": 9 + }, + { + "displayName": "Ingester: total lines sent", + "value": 10 + }, + { + "displayName": "Ingester: head chunk bytes", + "unit": "decbytes", + "value": 11 + }, + { + "displayName": "Ingester: head chunk lines", + "value": 12 + }, + { + "displayName": "Ingester: decompressed bytes", + "unit": "decbytes", + "value": 13 + }, + { + "displayName": "Ingester: decompressed lines", + "value": 14 + }, + { + "displayName": "Ingester: compressed bytes", + "unit": "decbytes", + "value": 15 + }, + { + "displayName": "Ingester: total duplicates", + "value": 16 + } + ], + "executedQueryString": "Expr: query1" + }, + "fields": [ + { + "name": "labels", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + }, + { + "name": "timestamp", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "body", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "id", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "labelTypes", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + }, + "config": { + "custom": { + "hidden": true + } + } + } + ] + }, + "data": { + "values": [ + [ + {}, + {}, + {}, + {}, + {}, + {} + ], + [ + 1645030244810, + 1645030247027, + 1645030246277, + 1645030246277, + 1645030245539, + 1645030244091 + ], + [ + "log line error 1", + "log line info 1", + "log line info 2", + "log line info 2", + "log line info 3", + "log line info 4" + ], + [ + "1645030244810757120_7bc6af55", + "1645030247027735040_2c9e69b9", + "1645030246277587968_b7784d5c", + "1645030246277587968_b7784d5c_1", + "1645030245539423744_4aee7aab", + "1645030244091700992_df059f36" + ], + [ + {}, + {}, + {}, + {}, + {}, + {} + ] + ], + "nanos": [ + null, + [ + 757120, + 735040, + 587968, + 587968, + 423744, + 700992 + ], + null, + null, + null + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.json b/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.json new file mode 100644 index 00000000000..760e15ca712 --- /dev/null +++ b/pkg/tsdb/loki/testdata_logs_dataplane/streams_structured_metadata.json @@ -0,0 +1,89 @@ +{ + "status": "success", + "data": { + "encodingFlags": [ + "categorize-labels" + ], + "resultType": "streams", + "result": [ + { + "stream": { + "code": "one\",", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030244810757120", + "log line error 1", + {} + ] + ] + }, + { + "stream": { + "code": "\",two", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030247027735040", + "log line info 1", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030246277587968", + "log line info 2", + {} + ], + [ + "1645030245539423744", + "log line info 3", + {} + ], + [ + "1645030244091700992", + "log line info 4", + {} + ] + ] + } + ], + "stats": { + "summary": { + "bytesProcessedPerSecond": 3507022, + "linesProcessedPerSecond": 24818, + "totalBytesProcessed": 7772, + "totalLinesProcessed": 55, + "execTime": 0.002216125 + }, + "store": { + "totalChunksRef": 2, + "totalChunksDownloaded": 3, + "chunksDownloadTime": 0.000390958, + "headChunkBytes": 4, + "headChunkLines": 5, + "decompressedBytes": 7772, + "decompressedLines": 55, + "compressedBytes": 31432, + "totalDuplicates": 6 + }, + "ingester": { + "totalReached": 7, + "totalChunksMatched": 8, + "totalBatches": 9, + "totalLinesSent": 10, + "headChunkBytes": 11, + "headChunkLines": 12, + "decompressedBytes": 13, + "decompressedLines": 14, + "compressedBytes": 15, + "totalDuplicates": 16 + } + } + } +} diff --git a/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.golden.jsonc b/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.golden.jsonc new file mode 100644 index 00000000000..6882e798623 --- /dev/null +++ b/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.golden.jsonc @@ -0,0 +1,348 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "typeVersion": [ +// 0, +// 0 +// ], +// "custom": { +// "frameType": "LabeledTimeValues" +// }, +// "stats": [ +// { +// "displayName": "Summary: bytes processed per second", +// "unit": "Bps", +// "value": 3507022 +// }, +// { +// "displayName": "Summary: lines processed per second", +// "value": 24818 +// }, +// { +// "displayName": "Summary: total bytes processed", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Summary: total lines processed", +// "value": 55 +// }, +// { +// "displayName": "Summary: exec time", +// "unit": "s", +// "value": 0.002216125 +// }, +// { +// "displayName": "Store: total chunks ref", +// "value": 2 +// }, +// { +// "displayName": "Store: total chunks downloaded", +// "value": 3 +// }, +// { +// "displayName": "Store: chunks download time", +// "unit": "s", +// "value": 0.000390958 +// }, +// { +// "displayName": "Store: head chunk bytes", +// "unit": "decbytes", +// "value": 4 +// }, +// { +// "displayName": "Store: head chunk lines", +// "value": 5 +// }, +// { +// "displayName": "Store: decompressed bytes", +// "unit": "decbytes", +// "value": 7772 +// }, +// { +// "displayName": "Store: decompressed lines", +// "value": 55 +// }, +// { +// "displayName": "Store: compressed bytes", +// "unit": "decbytes", +// "value": 31432 +// }, +// { +// "displayName": "Store: total duplicates", +// "value": 6 +// }, +// { +// "displayName": "Ingester: total reached", +// "value": 7 +// }, +// { +// "displayName": "Ingester: total chunks matched", +// "value": 8 +// }, +// { +// "displayName": "Ingester: total batches", +// "value": 9 +// }, +// { +// "displayName": "Ingester: total lines sent", +// "value": 10 +// }, +// { +// "displayName": "Ingester: head chunk bytes", +// "unit": "decbytes", +// "value": 11 +// }, +// { +// "displayName": "Ingester: head chunk lines", +// "value": 12 +// }, +// { +// "displayName": "Ingester: decompressed bytes", +// "unit": "decbytes", +// "value": 13 +// }, +// { +// "displayName": "Ingester: decompressed lines", +// "value": 14 +// }, +// { +// "displayName": "Ingester: compressed bytes", +// "unit": "decbytes", +// "value": 15 +// }, +// { +// "displayName": "Ingester: total duplicates", +// "value": 16 +// } +// ], +// "executedQueryString": "Expr: query1" +// } +// Name: +// Dimensions: 6 Fields by 2 Rows +// +-------------------------+----------------------------------------+------------------+---------------------+-------------------------+------------------------------+ +// | Name: labels | Name: Time | Name: Line | Name: tsNs | Name: labelTypes | Name: id | +// | Labels: | Labels: | Labels: | Labels: | Labels: | Labels: | +// | Type: []json.RawMessage | Type: []time.Time | Type: []string | Type: []string | Type: []json.RawMessage | Type: []string | +// +-------------------------+----------------------------------------+------------------+---------------------+-------------------------+------------------------------+ +// | {} | 2022-02-16 16:50:44.81075712 +0000 UTC | log line error 1 | 1645030244810757120 | {} | 1645030244810757120_7bc6af55 | +// | {} | 2022-02-16 16:50:47.02773504 +0000 UTC | log line info 1 | 1645030247027735040 | {} | 1645030247027735040_2c9e69b9 | +// +-------------------------+----------------------------------------+------------------+---------------------+-------------------------+------------------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "meta": { + "typeVersion": [ + 0, + 0 + ], + "custom": { + "frameType": "LabeledTimeValues" + }, + "stats": [ + { + "displayName": "Summary: bytes processed per second", + "unit": "Bps", + "value": 3507022 + }, + { + "displayName": "Summary: lines processed per second", + "value": 24818 + }, + { + "displayName": "Summary: total bytes processed", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Summary: total lines processed", + "value": 55 + }, + { + "displayName": "Summary: exec time", + "unit": "s", + "value": 0.002216125 + }, + { + "displayName": "Store: total chunks ref", + "value": 2 + }, + { + "displayName": "Store: total chunks downloaded", + "value": 3 + }, + { + "displayName": "Store: chunks download time", + "unit": "s", + "value": 0.000390958 + }, + { + "displayName": "Store: head chunk bytes", + "unit": "decbytes", + "value": 4 + }, + { + "displayName": "Store: head chunk lines", + "value": 5 + }, + { + "displayName": "Store: decompressed bytes", + "unit": "decbytes", + "value": 7772 + }, + { + "displayName": "Store: decompressed lines", + "value": 55 + }, + { + "displayName": "Store: compressed bytes", + "unit": "decbytes", + "value": 31432 + }, + { + "displayName": "Store: total duplicates", + "value": 6 + }, + { + "displayName": "Ingester: total reached", + "value": 7 + }, + { + "displayName": "Ingester: total chunks matched", + "value": 8 + }, + { + "displayName": "Ingester: total batches", + "value": 9 + }, + { + "displayName": "Ingester: total lines sent", + "value": 10 + }, + { + "displayName": "Ingester: head chunk bytes", + "unit": "decbytes", + "value": 11 + }, + { + "displayName": "Ingester: head chunk lines", + "value": 12 + }, + { + "displayName": "Ingester: decompressed bytes", + "unit": "decbytes", + "value": 13 + }, + { + "displayName": "Ingester: decompressed lines", + "value": 14 + }, + { + "displayName": "Ingester: compressed bytes", + "unit": "decbytes", + "value": 15 + }, + { + "displayName": "Ingester: total duplicates", + "value": 16 + } + ], + "executedQueryString": "Expr: query1" + }, + "fields": [ + { + "name": "labels", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + }, + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Line", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "tsNs", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "labelTypes", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + }, + "config": { + "custom": { + "hidden": true + } + } + }, + { + "name": "id", + "type": "string", + "typeInfo": { + "frame": "string" + } + } + ] + }, + "data": { + "values": [ + [ + {}, + {} + ], + [ + 1645030244810, + 1645030247027 + ], + [ + "log line error 1", + "log line info 1" + ], + [ + "1645030244810757120", + "1645030247027735040" + ], + [ + {}, + {} + ], + [ + "1645030244810757120_7bc6af55", + "1645030247027735040_2c9e69b9" + ] + ], + "nanos": [ + null, + [ + 757120, + 735040 + ], + null, + null, + null, + null + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.json b/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.json new file mode 100644 index 00000000000..f8dcaca0b19 --- /dev/null +++ b/pkg/tsdb/loki/testdata_metric_dataplane/streams_structured_metadata.json @@ -0,0 +1,69 @@ +{ + "status": "success", + "data": { + "encodingFlags": [ + "categorize-labels" + ], + "resultType": "streams", + "result": [ + { + "stream": { + "code": "one\",", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030244810757120", + "log line error 1", + {} + ] + ] + }, + { + "stream": { + "code": "\",two", + "location": "moonπŸŒ™" + }, + "values": [ + [ + "1645030247027735040", + "log line info 1", + {} + ] + ] + } + ], + "stats": { + "summary": { + "bytesProcessedPerSecond": 3507022, + "linesProcessedPerSecond": 24818, + "totalBytesProcessed": 7772, + "totalLinesProcessed": 55, + "execTime": 0.002216125 + }, + "store": { + "totalChunksRef": 2, + "totalChunksDownloaded": 3, + "chunksDownloadTime": 0.000390958, + "headChunkBytes": 4, + "headChunkLines": 5, + "decompressedBytes": 7772, + "decompressedLines": 55, + "compressedBytes": 31432, + "totalDuplicates": 6 + }, + "ingester": { + "totalReached": 7, + "totalChunksMatched": 8, + "totalBatches": 9, + "totalLinesSent": 10, + "headChunkBytes": 11, + "headChunkLines": 12, + "decompressedBytes": 13, + "decompressedLines": 14, + "compressedBytes": 15, + "totalDuplicates": 16 + } + } + } +} diff --git a/pkg/util/converter/prom.go b/pkg/util/converter/prom.go index 1f6bdb67b8b..9b847aa51cb 100644 --- a/pkg/util/converter/prom.go +++ b/pkg/util/converter/prom.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" jsoniter "github.com/json-iterator/go" + "golang.org/x/exp/slices" "github.com/grafana/grafana/pkg/util/converter/jsonitere" ) @@ -157,12 +158,28 @@ func readPrometheusData(iter *jsonitere.Iterator, opt Options) backend.DataRespo resultTypeFound := false var resultBytes []byte + encodingFlags := make([]string, 0) + l1Fields: for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { if err != nil { return rspErr(err) } switch l1Field { + case "encodingFlags": + for ok, err := iter.ReadArray(); ok; ok, err = iter.ReadArray() { + if err != nil { + return rspErr(err) + } + encodingFlag, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + encodingFlags = append(encodingFlags, encodingFlag) + } + if err != nil { + return rspErr(err) + } case "resultType": resultType, err = iter.ReadString() if err != nil { @@ -174,14 +191,14 @@ l1Fields: // we saved them because when we had them we don't know the resultType if len(resultBytes) > 0 { ji := jsonitere.NewIterator(jsoniter.ParseBytes(jsoniter.ConfigDefault, resultBytes)) - rsp = readResult(resultType, rsp, ji, opt) + rsp = readResult(resultType, rsp, ji, opt, encodingFlags) } case "result": // for some rare cases resultType is coming after the result. // when that happens we save the bytes and parse them after reading resultType // see: https://github.com/grafana/grafana/issues/64693 if resultTypeFound { - rsp = readResult(resultType, rsp, iter, opt) + rsp = readResult(resultType, rsp, iter, opt, encodingFlags) } else { resultBytes = iter.SkipAndReturnBytes() } @@ -224,7 +241,7 @@ l1Fields: } // will read the result object based on the resultType and return a DataResponse -func readResult(resultType string, rsp backend.DataResponse, iter *jsonitere.Iterator, opt Options) backend.DataResponse { +func readResult(resultType string, rsp backend.DataResponse, iter *jsonitere.Iterator, opt Options, encodingFlags []string) backend.DataResponse { switch resultType { case "matrix", "vector": rsp = readMatrixOrVectorMulti(iter, resultType, opt) @@ -232,7 +249,11 @@ func readResult(resultType string, rsp backend.DataResponse, iter *jsonitere.Ite return rsp } case "streams": - rsp = readStream(iter) + if slices.Contains(encodingFlags, "categorize-labels") { + rsp = readCategorizedStream(iter) + } else { + rsp = readStream(iter) + } if rsp.Error != nil { return rsp } @@ -925,6 +946,173 @@ func readStream(iter *jsonitere.Iterator) backend.DataResponse { return rsp } +func readCategorizedStream(iter *jsonitere.Iterator) backend.DataResponse { + rsp := backend.DataResponse{} + + labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0) + labelsField.Name = "__labels" // avoid automatically spreading this by labels + + labelTypesField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0) + labelTypesField.Name = "__labelTypes" // avoid automatically spreading this by labels + + timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) + timeField.Name = "Time" + + lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0) + lineField.Name = "Line" + + // Nanoseconds time field + tsField := data.NewFieldFromFieldType(data.FieldTypeString, 0) + tsField.Name = "TS" + + labels := data.Labels{} + + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + rspErr(err) + } + + l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } + switch l1Field { + case "stream": + // we need to clear `labels`, because `iter.ReadVal` + // only appends to it + labels := data.Labels{} + if err = iter.ReadVal(&labels); err != nil { + return rspErr(err) + } + + case "values": + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + rsp.Error = err + return rsp + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + ts, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + line, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + parsedLabelsMap, structuredMetadataMap, err := readCategorizedStreamField(iter) + if err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + t, err := timeFromLokiString(ts) + if err != nil { + return rspErr(err) + } + + typeMap := data.Labels{} + + for k := range labels { + typeMap[k] = "I" + } + + // merge all labels (indexed, parsed, structuredMetadata) into one dataframe field + for k, v := range structuredMetadataMap { + labels[k] = fmt.Sprintf("%s", v) + typeMap[k] = "S" + } + + for k, v := range parsedLabelsMap { + labels[k] = fmt.Sprintf("%s", v) + typeMap[k] = "P" + } + + labelJson, err := labelsToRawJson(labels) + if err != nil { + return rspErr(err) + } + + labelTypesJson, err := labelsToRawJson(typeMap) + if err != nil { + return rspErr(err) + } + + labelsField.Append(labelJson) + labelTypesField.Append(labelTypesJson) + timeField.Append(t) + lineField.Append(line) + tsField.Append(ts) + } + case "": + if err != nil { + return rspErr(err) + } + break l1Fields + } + } + } + + frame := data.NewFrame("", labelsField, timeField, lineField, tsField, labelTypesField) + frame.Meta = &data.FrameMeta{} + rsp.Frames = append(rsp.Frames, frame) + + return rsp +} + +func readCategorizedStreamField(iter *jsonitere.Iterator) (map[string]interface{}, map[string]interface{}, error) { + parsedLabels := data.Labels{} + structuredMetadata := data.Labels{} + var parsedLabelsMap map[string]interface{} + var structuredMetadataMap map[string]interface{} +streamField: + for streamField, err := iter.ReadObject(); ; streamField, err = iter.ReadObject() { + if err != nil { + return nil, nil, err + } + switch streamField { + case "parsed": + if err = iter.ReadVal(&parsedLabels); err != nil { + return nil, nil, err + } + case "structuredMetadata": + if err = iter.ReadVal(&structuredMetadata); err != nil { + return nil, nil, err + } + case "": + if err != nil { + return nil, nil, err + } + if parsedLabelsMap, err = labelsToMap(parsedLabels); err != nil { + return nil, nil, err + } + if structuredMetadataMap, err = labelsToMap(structuredMetadata); err != nil { + return nil, nil, err + } + break streamField + } + } + return parsedLabelsMap, structuredMetadataMap, nil +} + func resultTypeToCustomMeta(resultType string) map[string]string { return map[string]string{"resultType": resultType} } @@ -963,3 +1151,19 @@ func labelsToRawJson(labels data.Labels) (json.RawMessage, error) { return json.RawMessage(bytes), nil } + +func labelsToMap(labels data.Labels) (map[string]interface{}, error) { + // data.Labels when converted to JSON keep the fields sorted + labelJson, err := labelsToRawJson(labels) + if err != nil { + return nil, err + } + + var labelMap map[string]interface{} + err = jsoniter.Unmarshal(labelJson, &labelMap) + if err != nil { + return nil, err + } + + return labelMap, nil +} diff --git a/pkg/util/converter/prom_test.go b/pkg/util/converter/prom_test.go index 5ea6639acec..18908c9a0cd 100644 --- a/pkg/util/converter/prom_test.go +++ b/pkg/util/converter/prom_test.go @@ -67,9 +67,6 @@ func TestReadLimited(t *testing.T) { } } -// FIXME: -// -//lint:ignore U1000 Ignore used function for now func runScenario(name string, opts Options) func(t *testing.T) { return func(t *testing.T) { // Safe to disable, this is a test. diff --git a/pkg/util/converter/testdata/loki-streams-structured-metadata-frame.jsonc b/pkg/util/converter/testdata/loki-streams-structured-metadata-frame.jsonc new file mode 100644 index 00000000000..15780199262 --- /dev/null +++ b/pkg/util/converter/testdata/loki-streams-structured-metadata-frame.jsonc @@ -0,0 +1,97 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "typeVersion": [ +// 0, +// 0 +// ] +// } +// Name: +// Dimensions: 5 Fields by 1 Rows +// +----------------------------------------+-----------------------------------+----------------+---------------------+--------------------------------+ +// | Name: __labels | Name: Time | Name: Line | Name: TS | Name: __labelTypes | +// | Labels: | Labels: | Labels: | Labels: | Labels: | +// | Type: []json.RawMessage | Type: []time.Time | Type: []string | Type: []string | Type: []json.RawMessage | +// +----------------------------------------+-----------------------------------+----------------+---------------------+--------------------------------+ +// | {"label":"value","nonIndexed":"value"} | 2023-10-11 11:55:10.236 +0000 UTC | text | 1697025310236000000 | {"nonIndexed":"S","label":"I"} | +// +----------------------------------------+-----------------------------------+----------------+---------------------+--------------------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "meta": { + "typeVersion": [ + 0, + 0 + ] + }, + "fields": [ + { + "name": "__labels", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + }, + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Line", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "TS", + "type": "string", + "typeInfo": { + "frame": "string" + } + }, + { + "name": "__labelTypes", + "type": "other", + "typeInfo": { + "frame": "json.RawMessage" + } + } + ] + }, + "data": { + "values": [ + [ + { + "label": "value", + "nonIndexed": "value" + } + ], + [ + 1697025310236 + ], + [ + "text" + ], + [ + "1697025310236000000" + ], + [ + { + "nonIndexed": "S", + "label": "I" + } + ] + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/util/converter/testdata/loki-streams-structured-metadata.json b/pkg/util/converter/testdata/loki-streams-structured-metadata.json new file mode 100644 index 00000000000..88401d4cb5e --- /dev/null +++ b/pkg/util/converter/testdata/loki-streams-structured-metadata.json @@ -0,0 +1,27 @@ +{ + "status": "success", + "data": { + "encodingFlags": [ + "categorize-labels" + ], + "resultType": "streams", + "result": [ + { + "stream": { + "label": "value" + }, + "values": [ + [ + "1697025310236000000", + "text", + { + "structuredMetadata": { + "nonIndexed": "value" + } + } + ] + ] + } + ] + } +} \ No newline at end of file