diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index ef0242c0389..c566a4b2998 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -40,6 +40,7 @@ export interface FeatureToggles { showFeatureFlagsInUI?: boolean; disable_http_request_histogram?: boolean; validatedQueries?: boolean; + lokiLive?: boolean; swaggerUi?: boolean; featureHighlights?: boolean; dashboardComments?: boolean; diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index 8aba04eda6a..f0a542c248f 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -132,6 +132,11 @@ var ( State: FeatureStateAlpha, RequiresDevMode: true, }, + { + Name: "lokiLive", + Description: "support websocket streaming for loki (early prototype)", + State: FeatureStateAlpha, + }, { Name: "swaggerUi", Description: "Serves swagger UI", diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 70f7b58c3eb..bb15e26e660 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -99,6 +99,10 @@ const ( // only execute the query saved in a panel FlagValidatedQueries = "validatedQueries" + // FlagLokiLive + // support websocket streaming for loki (early prototype) + FlagLokiLive = "lokiLive" + // FlagSwaggerUi // Serves swagger UI FlagSwaggerUi = "swaggerUi" diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index c554386854f..59cc30eb7e1 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -2,9 +2,11 @@ package loki import ( "context" + "encoding/json" "fmt" "net/http" "regexp" + "sync" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -22,6 +24,11 @@ type Service struct { tracer tracing.Tracer } +var ( + _ backend.QueryDataHandler = (*Service)(nil) + _ backend.StreamHandler = (*Service)(nil) +) + func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service { return &Service{ im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), @@ -37,6 +44,10 @@ var ( type datasourceInfo struct { HTTPClient *http.Client URL string + + // open streams + streams map[string]data.FrameJSONCache + streamsMu sync.RWMutex } type QueryJSONModel struct { @@ -50,6 +61,12 @@ type QueryJSONModel struct { VolumeQuery bool `json:"volumeQuery"` } +func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) { + model := &QueryJSONModel{} + err := json.Unmarshal(raw, model) + return model, err +} + func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { opts, err := settings.HTTPClientOptions() @@ -65,6 +82,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst model := &datasourceInfo{ HTTPClient: client, URL: settings.URL, + streams: make(map[string]data.FrameJSONCache), } return model, nil } diff --git a/pkg/tsdb/loki/parse_query.go b/pkg/tsdb/loki/parse_query.go index 2ebf64dee6f..d62802e5f06 100644 --- a/pkg/tsdb/loki/parse_query.go +++ b/pkg/tsdb/loki/parse_query.go @@ -1,7 +1,6 @@ package loki import ( - "encoding/json" "fmt" "math" "strconv" @@ -71,8 +70,7 @@ func parseQueryType(jsonValue string) (QueryType, error) { func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) { qs := []*lokiQuery{} for _, query := range queryContext.Queries { - model := &QueryJSONModel{} - err := json.Unmarshal(query.JSON, model) + model, err := parseQueryModel(query.JSON) if err != nil { return nil, err } diff --git a/pkg/tsdb/loki/streaming.go b/pkg/tsdb/loki/streaming.go new file mode 100644 index 00000000000..e2c6ff96180 --- /dev/null +++ b/pkg/tsdb/loki/streaming.go @@ -0,0 +1,200 @@ +package loki + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "os/signal" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + dsInfo, err := s.getDSInfo(req.PluginContext) + if err != nil { + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusNotFound, + }, err + } + + // Expect tail/${key} + if !strings.HasPrefix(req.Path, "tail/") { + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusNotFound, + }, fmt.Errorf("expected tail in channel path") + } + + query, err := parseQueryModel(req.Data) + if err != nil { + return nil, err + } + if query.Expr == "" { + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusNotFound, + }, fmt.Errorf("missing expr in channel (subscribe)") + } + + dsInfo.streamsMu.RLock() + defer dsInfo.streamsMu.RUnlock() + + cache, ok := dsInfo.streams[req.Path] + if ok { + msg, err := backend.NewInitialData(cache.Bytes(data.IncludeAll)) + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusOK, + InitialData: msg, + }, err + } + + // nothing yet + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusOK, + }, err +} + +// Single instance for each channel (results are shared with all listeners) +func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + dsInfo, err := s.getDSInfo(req.PluginContext) + if err != nil { + return err + } + + query, err := parseQueryModel(req.Data) + if err != nil { + return err + } + if query.Expr == "" { + return fmt.Errorf("missing expr in cuannel") + } + + count := int64(0) + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + params := url.Values{} + params.Add("query", query.Expr) + + isV1 := false + wsurl, _ := url.Parse(dsInfo.URL) + + // Check if the v2alpha endpoint exists + wsurl.Path = "/loki/api/v2alpha/tail" + if !is400(dsInfo.HTTPClient, wsurl) { + isV1 = true + wsurl.Path = "/loki/api/v1/tail" + } + + if wsurl.Scheme == "https" { + wsurl.Scheme = "wss" + } else { + wsurl.Scheme = "ws" + } + wsurl.RawQuery = params.Encode() + + s.plog.Info("connecting to websocket", "url", wsurl) + c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil) + if err != nil { + s.plog.Error("error connecting to websocket", "err", err) + return fmt.Errorf("error connecting to websocket") + } + + defer func() { + dsInfo.streamsMu.Lock() + delete(dsInfo.streams, req.Path) + dsInfo.streamsMu.Unlock() + if r != nil { + _ = r.Body.Close() + } + err = c.Close() + s.plog.Error("closing loki websocket", "err", err) + }() + + prev := data.FrameJSONCache{} + + // Read all messages + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + s.plog.Error("websocket read:", "err", err) + return + } + + frame := &data.Frame{} + if isV1 { + frame, err = lokiBytesToLabeledFrame(message) + } else { + err = json.Unmarshal(message, &frame) + } + + if err == nil && frame != nil { + next, _ := data.FrameToJSONCache(frame) + if next.SameSchema(&prev) { + err = sender.SendBytes(next.Bytes(data.IncludeDataOnly)) + } else { + err = sender.SendFrame(frame, data.IncludeAll) + } + prev = next + + // Cache the initial data + dsInfo.streamsMu.Lock() + dsInfo.streams[req.Path] = prev + dsInfo.streamsMu.Unlock() + } + + if err != nil { + s.plog.Error("websocket write:", "err", err, "raw", message) + return + } + } + }() + + ticker := time.NewTicker(time.Second * 60) //.Step) + defer ticker.Stop() + + for { + select { + case <-done: + s.plog.Info("socket done") + return nil + case <-ctx.Done(): + s.plog.Info("stop streaming (context canceled)") + return nil + case t := <-ticker.C: + count++ + s.plog.Error("loki websocket ping?", "time", t, "count", count) + } + } +} + +func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + return &backend.PublishStreamResponse{ + Status: backend.PublishStreamStatusPermissionDenied, + }, nil +} + +// if the v2 endpoint exists it will give a 400 rather than 404/500 +func is400(client *http.Client, url *url.URL) bool { + req, err := http.NewRequest("GET", url.String(), nil) + if err != nil { + return false + } + rsp, err := client.Do(req) + if err != nil { + return false + } + defer func() { + _ = rsp.Body.Close() + }() + return rsp.StatusCode == 400 // will be true +} diff --git a/pkg/tsdb/loki/streaming_frame.go b/pkg/tsdb/loki/streaming_frame.go new file mode 100644 index 00000000000..e03cd298b7d --- /dev/null +++ b/pkg/tsdb/loki/streaming_frame.go @@ -0,0 +1,52 @@ +package loki + +import ( + "encoding/json" + "strconv" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type lokiResponse struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Stream data.Labels `json:"stream"` + Values [][2]string `json:"values"` +} + +func lokiBytesToLabeledFrame(msg []byte) (*data.Frame, error) { + rsp := &lokiResponse{} + err := json.Unmarshal(msg, rsp) + if err != nil { + return nil, err + } + + labelField := data.NewFieldFromFieldType(data.FieldTypeString, 0) + timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) + lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0) + + labelField.Name = "__labels" // for now, avoid automatically spreading this by labels + timeField.Name = "Time" + lineField.Name = "Line" + + for _, stream := range rsp.Streams { + label := stream.Stream.String() // TODO -- make it match prom labels! + for _, value := range stream.Values { + n, err := strconv.ParseInt(value[0], 10, 64) + if err != nil { + continue + } + ts := time.Unix(0, n) + line := value[1] + + labelField.Append(label) + timeField.Append(ts) + lineField.Append(line) + } + } + + return data.NewFrame("", labelField, timeField, lineField), nil +} diff --git a/pkg/tsdb/loki/streaming_frame_test.go b/pkg/tsdb/loki/streaming_frame_test.go new file mode 100644 index 00000000000..659bc8320db --- /dev/null +++ b/pkg/tsdb/loki/streaming_frame_test.go @@ -0,0 +1,40 @@ +package loki + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLokiFramer(t *testing.T) { + t.Run("converting metric name", func(t *testing.T) { + msg := []byte(`{"streams":[ + {"stream": + {"job":"node-exporter","metric":"go_memstats_heap_inuse_bytes"}, + "values":[ + ["1642091525267322910","line1"] + ]}, + {"stream": + {"job":"node-exporter","metric":"go_memstats_heap_inuse_bytes"}, + "values":[ + ["1642091525770585774","line2"], + ["1642091525770585775","line3"] + ]}, + {"stream": + {"metric":"go_memstats_heap_inuse_bytes","job":"node-exporter"}, + "values":[ + ["1642091526263785281","line4"] + ]} + ]}`) + + frame, err := lokiBytesToLabeledFrame(msg) + require.NoError(t, err) + + lines := frame.Fields[2] + require.Equal(t, 4, lines.Len()) + require.Equal(t, "line1", lines.At(0)) + require.Equal(t, "line2", lines.At(1)) + require.Equal(t, "line3", lines.At(2)) + require.Equal(t, "line4", lines.At(3)) + }) +} diff --git a/public/app/plugins/datasource/loki/components/LokiOptionFields.tsx b/public/app/plugins/datasource/loki/components/LokiOptionFields.tsx index ae66ec73ab4..17854499e85 100644 --- a/public/app/plugins/datasource/loki/components/LokiOptionFields.tsx +++ b/public/app/plugins/datasource/loki/components/LokiOptionFields.tsx @@ -6,6 +6,7 @@ import { map } from 'lodash'; // Types import { InlineFormLabel, RadioButtonGroup, InlineField, Input, Select } from '@grafana/ui'; import { SelectableValue } from '@grafana/data'; +import { config } from '@grafana/runtime'; import { LokiQuery, LokiQueryType } from '../types'; export interface LokiOptionFieldsProps { @@ -24,13 +25,16 @@ const queryTypeOptions: Array> = [ label: 'Instant', description: 'Run query against a single point in time. For this query, the "To" time is used.', }, - // { - // value: LokiQueryType.Stream, - // label: 'Stream', - // description: 'Run a query and keep sending results on an interval', - // }, ]; +if (config.featureToggles.lokiLive) { + queryTypeOptions.push({ + value: LokiQueryType.Stream, + label: 'Stream', + description: 'Run a query and keep sending results on an interval', + }); +} + export const DEFAULT_RESOLUTION: SelectableValue = { value: 1, label: '1/1', diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index 4d4b8332a37..4136a4fc770 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -33,7 +33,7 @@ import { ScopedVars, TimeRange, } from '@grafana/data'; -import { BackendSrvRequest, FetchError, getBackendSrv, DataSourceWithBackend } from '@grafana/runtime'; +import { BackendSrvRequest, FetchError, getBackendSrv, config, DataSourceWithBackend } from '@grafana/runtime'; import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv'; import { addLabelToQuery } from './add_label_to_query'; import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv'; @@ -63,7 +63,7 @@ import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContext import syntax from './syntax'; import { DEFAULT_RESOLUTION } from './components/LokiOptionFields'; import { queryLogsVolume } from 'app/core/logs_model'; -import config from 'app/core/config'; +import { doLokiChannelStream } from './streaming'; import { renderLegendFormat } from '../prometheus/legend'; export type RangeQueryOptions = DataQueryRequest | AnnotationQueryRequest; @@ -178,6 +178,12 @@ export class LokiDatasource for (const target of filteredTargets) { if (target.instant || target.queryType === LokiQueryType.Instant) { subQueries.push(this.runInstantQuery(target, request, filteredTargets.length)); + } else if ( + config.featureToggles.lokiLive && + target.queryType === LokiQueryType.Stream && + request.rangeRaw?.to === 'now' + ) { + subQueries.push(doLokiChannelStream(target, this, request)); } else { subQueries.push(this.runRangeQuery(target, request, filteredTargets.length)); } diff --git a/public/app/plugins/datasource/loki/streaming.ts b/public/app/plugins/datasource/loki/streaming.ts new file mode 100644 index 00000000000..08655ef76fd --- /dev/null +++ b/public/app/plugins/datasource/loki/streaming.ts @@ -0,0 +1,80 @@ +import { DataFrameJSON, DataQueryRequest, DataQueryResponse, LiveChannelScope, LoadingState } from '@grafana/data'; +import { getGrafanaLiveSrv } from '@grafana/runtime'; +import { map, Observable, defer, mergeMap } from 'rxjs'; +import LokiDatasource from './datasource'; +import { LokiQuery } from './types'; +import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame'; + +/** + * Calculate a unique key for the query. The key is used to pick a channel and should + * be unique for each distinct query execution plan. This key is not secure and is only picked to avoid + * possible collisions + */ +export async function getLiveStreamKey(query: LokiQuery): Promise { + const str = JSON.stringify({ expr: query.expr }); + + const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array + const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message + const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes + return hashArray.map((b) => b.toString(16).padStart(2, '0')).join(''); +} + +// This will get both v1 and v2 result formats +export function doLokiChannelStream( + query: LokiQuery, + ds: LokiDatasource, + options: DataQueryRequest +): Observable { + // maximum time to keep values + const range = options.range; + const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000; + let maxLength = options.maxDataPoints ?? 1000; + if (maxLength > 100) { + // for small buffers, keep them small + maxLength *= 2; + } + + let frame: StreamingDataFrame | undefined = undefined; + const updateFrame = (msg: any) => { + if (msg?.message) { + const p = msg.message as DataFrameJSON; + if (!frame) { + frame = StreamingDataFrame.fromDataFrameJSON(p, { + maxLength, + maxDelta, + displayNameFormat: query.legendFormat, + }); + } else { + frame.push(p); + } + } + return frame; + }; + + return defer(() => getLiveStreamKey(query)).pipe( + mergeMap((key) => { + return getGrafanaLiveSrv() + .getStream({ + scope: LiveChannelScope.DataSource, + namespace: ds.uid, + path: `tail/${key}`, + data: { + ...query, + timeRange: { + from: range.from.valueOf().toString(), + to: range.to.valueOf().toString(), + }, + }, + }) + .pipe( + map((evt) => { + const frame = updateFrame(evt); + return { + data: frame ? [frame] : [], + state: LoadingState.Streaming, + }; + }) + ); + }) + ); +} diff --git a/public/app/plugins/datasource/loki/types.ts b/public/app/plugins/datasource/loki/types.ts index 35e37bdad8f..c72954e7c38 100644 --- a/public/app/plugins/datasource/loki/types.ts +++ b/public/app/plugins/datasource/loki/types.ts @@ -27,7 +27,7 @@ export enum LokiResultType { export enum LokiQueryType { Range = 'range', Instant = 'instant', - // Stream = 'stream', + Stream = 'stream', } export interface LokiQuery extends DataQuery {