Loki: support live streaming (#42804)

This commit is contained in:
Ryan McKinley 2022-03-01 14:46:52 -08:00 committed by GitHub
parent 09cde9a700
commit 796bc27f75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 419 additions and 11 deletions

View File

@ -40,6 +40,7 @@ export interface FeatureToggles {
showFeatureFlagsInUI?: boolean; showFeatureFlagsInUI?: boolean;
disable_http_request_histogram?: boolean; disable_http_request_histogram?: boolean;
validatedQueries?: boolean; validatedQueries?: boolean;
lokiLive?: boolean;
swaggerUi?: boolean; swaggerUi?: boolean;
featureHighlights?: boolean; featureHighlights?: boolean;
dashboardComments?: boolean; dashboardComments?: boolean;

View File

@ -132,6 +132,11 @@ var (
State: FeatureStateAlpha, State: FeatureStateAlpha,
RequiresDevMode: true, RequiresDevMode: true,
}, },
{
Name: "lokiLive",
Description: "support websocket streaming for loki (early prototype)",
State: FeatureStateAlpha,
},
{ {
Name: "swaggerUi", Name: "swaggerUi",
Description: "Serves swagger UI", Description: "Serves swagger UI",

View File

@ -99,6 +99,10 @@ const (
// only execute the query saved in a panel // only execute the query saved in a panel
FlagValidatedQueries = "validatedQueries" FlagValidatedQueries = "validatedQueries"
// FlagLokiLive
// support websocket streaming for loki (early prototype)
FlagLokiLive = "lokiLive"
// FlagSwaggerUi // FlagSwaggerUi
// Serves swagger UI // Serves swagger UI
FlagSwaggerUi = "swaggerUi" FlagSwaggerUi = "swaggerUi"

View File

@ -2,9 +2,11 @@ package loki
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"regexp" "regexp"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@ -22,6 +24,11 @@ type Service struct {
tracer tracing.Tracer tracer tracing.Tracer
} }
var (
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.StreamHandler = (*Service)(nil)
)
func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service { func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service {
return &Service{ return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
@ -37,6 +44,10 @@ var (
type datasourceInfo struct { type datasourceInfo struct {
HTTPClient *http.Client HTTPClient *http.Client
URL string URL string
// open streams
streams map[string]data.FrameJSONCache
streamsMu sync.RWMutex
} }
type QueryJSONModel struct { type QueryJSONModel struct {
@ -50,6 +61,12 @@ type QueryJSONModel struct {
VolumeQuery bool `json:"volumeQuery"` VolumeQuery bool `json:"volumeQuery"`
} }
func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) {
model := &QueryJSONModel{}
err := json.Unmarshal(raw, model)
return model, err
}
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions() opts, err := settings.HTTPClientOptions()
@ -65,6 +82,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
model := &datasourceInfo{ model := &datasourceInfo{
HTTPClient: client, HTTPClient: client,
URL: settings.URL, URL: settings.URL,
streams: make(map[string]data.FrameJSONCache),
} }
return model, nil return model, nil
} }

View File

@ -1,7 +1,6 @@
package loki package loki
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
@ -71,8 +70,7 @@ func parseQueryType(jsonValue string) (QueryType, error) {
func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) { func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) {
qs := []*lokiQuery{} qs := []*lokiQuery{}
for _, query := range queryContext.Queries { for _, query := range queryContext.Queries {
model := &QueryJSONModel{} model, err := parseQueryModel(query.JSON)
err := json.Unmarshal(query.JSON, model)
if err != nil { if err != nil {
return nil, err return nil, err
} }

200
pkg/tsdb/loki/streaming.go Normal file
View File

@ -0,0 +1,200 @@
package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, err
}
// Expect tail/${key}
if !strings.HasPrefix(req.Path, "tail/") {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("expected tail in channel path")
}
query, err := parseQueryModel(req.Data)
if err != nil {
return nil, err
}
if query.Expr == "" {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("missing expr in channel (subscribe)")
}
dsInfo.streamsMu.RLock()
defer dsInfo.streamsMu.RUnlock()
cache, ok := dsInfo.streams[req.Path]
if ok {
msg, err := backend.NewInitialData(cache.Bytes(data.IncludeAll))
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
InitialData: msg,
}, err
}
// nothing yet
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, err
}
// Single instance for each channel (results are shared with all listeners)
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return err
}
query, err := parseQueryModel(req.Data)
if err != nil {
return err
}
if query.Expr == "" {
return fmt.Errorf("missing expr in cuannel")
}
count := int64(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
params := url.Values{}
params.Add("query", query.Expr)
isV1 := false
wsurl, _ := url.Parse(dsInfo.URL)
// Check if the v2alpha endpoint exists
wsurl.Path = "/loki/api/v2alpha/tail"
if !is400(dsInfo.HTTPClient, wsurl) {
isV1 = true
wsurl.Path = "/loki/api/v1/tail"
}
if wsurl.Scheme == "https" {
wsurl.Scheme = "wss"
} else {
wsurl.Scheme = "ws"
}
wsurl.RawQuery = params.Encode()
s.plog.Info("connecting to websocket", "url", wsurl)
c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil)
if err != nil {
s.plog.Error("error connecting to websocket", "err", err)
return fmt.Errorf("error connecting to websocket")
}
defer func() {
dsInfo.streamsMu.Lock()
delete(dsInfo.streams, req.Path)
dsInfo.streamsMu.Unlock()
if r != nil {
_ = r.Body.Close()
}
err = c.Close()
s.plog.Error("closing loki websocket", "err", err)
}()
prev := data.FrameJSONCache{}
// Read all messages
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
s.plog.Error("websocket read:", "err", err)
return
}
frame := &data.Frame{}
if isV1 {
frame, err = lokiBytesToLabeledFrame(message)
} else {
err = json.Unmarshal(message, &frame)
}
if err == nil && frame != nil {
next, _ := data.FrameToJSONCache(frame)
if next.SameSchema(&prev) {
err = sender.SendBytes(next.Bytes(data.IncludeDataOnly))
} else {
err = sender.SendFrame(frame, data.IncludeAll)
}
prev = next
// Cache the initial data
dsInfo.streamsMu.Lock()
dsInfo.streams[req.Path] = prev
dsInfo.streamsMu.Unlock()
}
if err != nil {
s.plog.Error("websocket write:", "err", err, "raw", message)
return
}
}
}()
ticker := time.NewTicker(time.Second * 60) //.Step)
defer ticker.Stop()
for {
select {
case <-done:
s.plog.Info("socket done")
return nil
case <-ctx.Done():
s.plog.Info("stop streaming (context canceled)")
return nil
case t := <-ticker.C:
count++
s.plog.Error("loki websocket ping?", "time", t, "count", count)
}
}
}
func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}
// if the v2 endpoint exists it will give a 400 rather than 404/500
func is400(client *http.Client, url *url.URL) bool {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return false
}
rsp, err := client.Do(req)
if err != nil {
return false
}
defer func() {
_ = rsp.Body.Close()
}()
return rsp.StatusCode == 400 // will be true
}

View File

@ -0,0 +1,52 @@
package loki
import (
"encoding/json"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type lokiResponse struct {
Streams []lokiStream `json:"streams"`
}
type lokiStream struct {
Stream data.Labels `json:"stream"`
Values [][2]string `json:"values"`
}
func lokiBytesToLabeledFrame(msg []byte) (*data.Frame, error) {
rsp := &lokiResponse{}
err := json.Unmarshal(msg, rsp)
if err != nil {
return nil, err
}
labelField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
labelField.Name = "__labels" // for now, avoid automatically spreading this by labels
timeField.Name = "Time"
lineField.Name = "Line"
for _, stream := range rsp.Streams {
label := stream.Stream.String() // TODO -- make it match prom labels!
for _, value := range stream.Values {
n, err := strconv.ParseInt(value[0], 10, 64)
if err != nil {
continue
}
ts := time.Unix(0, n)
line := value[1]
labelField.Append(label)
timeField.Append(ts)
lineField.Append(line)
}
}
return data.NewFrame("", labelField, timeField, lineField), nil
}

View File

@ -0,0 +1,40 @@
package loki
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestLokiFramer(t *testing.T) {
t.Run("converting metric name", func(t *testing.T) {
msg := []byte(`{"streams":[
{"stream":
{"job":"node-exporter","metric":"go_memstats_heap_inuse_bytes"},
"values":[
["1642091525267322910","line1"]
]},
{"stream":
{"job":"node-exporter","metric":"go_memstats_heap_inuse_bytes"},
"values":[
["1642091525770585774","line2"],
["1642091525770585775","line3"]
]},
{"stream":
{"metric":"go_memstats_heap_inuse_bytes","job":"node-exporter"},
"values":[
["1642091526263785281","line4"]
]}
]}`)
frame, err := lokiBytesToLabeledFrame(msg)
require.NoError(t, err)
lines := frame.Fields[2]
require.Equal(t, 4, lines.Len())
require.Equal(t, "line1", lines.At(0))
require.Equal(t, "line2", lines.At(1))
require.Equal(t, "line3", lines.At(2))
require.Equal(t, "line4", lines.At(3))
})
}

View File

@ -6,6 +6,7 @@ import { map } from 'lodash';
// Types // Types
import { InlineFormLabel, RadioButtonGroup, InlineField, Input, Select } from '@grafana/ui'; import { InlineFormLabel, RadioButtonGroup, InlineField, Input, Select } from '@grafana/ui';
import { SelectableValue } from '@grafana/data'; import { SelectableValue } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiQuery, LokiQueryType } from '../types'; import { LokiQuery, LokiQueryType } from '../types';
export interface LokiOptionFieldsProps { export interface LokiOptionFieldsProps {
@ -24,13 +25,16 @@ const queryTypeOptions: Array<SelectableValue<LokiQueryType>> = [
label: 'Instant', label: 'Instant',
description: 'Run query against a single point in time. For this query, the "To" time is used.', description: 'Run query against a single point in time. For this query, the "To" time is used.',
}, },
// {
// value: LokiQueryType.Stream,
// label: 'Stream',
// description: 'Run a query and keep sending results on an interval',
// },
]; ];
if (config.featureToggles.lokiLive) {
queryTypeOptions.push({
value: LokiQueryType.Stream,
label: 'Stream',
description: 'Run a query and keep sending results on an interval',
});
}
export const DEFAULT_RESOLUTION: SelectableValue<number> = { export const DEFAULT_RESOLUTION: SelectableValue<number> = {
value: 1, value: 1,
label: '1/1', label: '1/1',

View File

@ -33,7 +33,7 @@ import {
ScopedVars, ScopedVars,
TimeRange, TimeRange,
} from '@grafana/data'; } from '@grafana/data';
import { BackendSrvRequest, FetchError, getBackendSrv, DataSourceWithBackend } from '@grafana/runtime'; import { BackendSrvRequest, FetchError, getBackendSrv, config, DataSourceWithBackend } from '@grafana/runtime';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv'; import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { addLabelToQuery } from './add_label_to_query'; import { addLabelToQuery } from './add_label_to_query';
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
@ -63,7 +63,7 @@ import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContext
import syntax from './syntax'; import syntax from './syntax';
import { DEFAULT_RESOLUTION } from './components/LokiOptionFields'; import { DEFAULT_RESOLUTION } from './components/LokiOptionFields';
import { queryLogsVolume } from 'app/core/logs_model'; import { queryLogsVolume } from 'app/core/logs_model';
import config from 'app/core/config'; import { doLokiChannelStream } from './streaming';
import { renderLegendFormat } from '../prometheus/legend'; import { renderLegendFormat } from '../prometheus/legend';
export type RangeQueryOptions = DataQueryRequest<LokiQuery> | AnnotationQueryRequest<LokiQuery>; export type RangeQueryOptions = DataQueryRequest<LokiQuery> | AnnotationQueryRequest<LokiQuery>;
@ -178,6 +178,12 @@ export class LokiDatasource
for (const target of filteredTargets) { for (const target of filteredTargets) {
if (target.instant || target.queryType === LokiQueryType.Instant) { if (target.instant || target.queryType === LokiQueryType.Instant) {
subQueries.push(this.runInstantQuery(target, request, filteredTargets.length)); subQueries.push(this.runInstantQuery(target, request, filteredTargets.length));
} else if (
config.featureToggles.lokiLive &&
target.queryType === LokiQueryType.Stream &&
request.rangeRaw?.to === 'now'
) {
subQueries.push(doLokiChannelStream(target, this, request));
} else { } else {
subQueries.push(this.runRangeQuery(target, request, filteredTargets.length)); subQueries.push(this.runRangeQuery(target, request, filteredTargets.length));
} }

View File

@ -0,0 +1,80 @@
import { DataFrameJSON, DataQueryRequest, DataQueryResponse, LiveChannelScope, LoadingState } from '@grafana/data';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import { map, Observable, defer, mergeMap } from 'rxjs';
import LokiDatasource from './datasource';
import { LokiQuery } from './types';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
/**
* Calculate a unique key for the query. The key is used to pick a channel and should
* be unique for each distinct query execution plan. This key is not secure and is only picked to avoid
* possible collisions
*/
export async function getLiveStreamKey(query: LokiQuery): Promise<string> {
const str = JSON.stringify({ expr: query.expr });
const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array
const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message
const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
}
// This will get both v1 and v2 result formats
export function doLokiChannelStream(
query: LokiQuery,
ds: LokiDatasource,
options: DataQueryRequest<LokiQuery>
): Observable<DataQueryResponse> {
// maximum time to keep values
const range = options.range;
const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000;
let maxLength = options.maxDataPoints ?? 1000;
if (maxLength > 100) {
// for small buffers, keep them small
maxLength *= 2;
}
let frame: StreamingDataFrame | undefined = undefined;
const updateFrame = (msg: any) => {
if (msg?.message) {
const p = msg.message as DataFrameJSON;
if (!frame) {
frame = StreamingDataFrame.fromDataFrameJSON(p, {
maxLength,
maxDelta,
displayNameFormat: query.legendFormat,
});
} else {
frame.push(p);
}
}
return frame;
};
return defer(() => getLiveStreamKey(query)).pipe(
mergeMap((key) => {
return getGrafanaLiveSrv()
.getStream<any>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `tail/${key}`,
data: {
...query,
timeRange: {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
},
},
})
.pipe(
map((evt) => {
const frame = updateFrame(evt);
return {
data: frame ? [frame] : [],
state: LoadingState.Streaming,
};
})
);
})
);
}

View File

@ -27,7 +27,7 @@ export enum LokiResultType {
export enum LokiQueryType { export enum LokiQueryType {
Range = 'range', Range = 'range',
Instant = 'instant', Instant = 'instant',
// Stream = 'stream', Stream = 'stream',
} }
export interface LokiQuery extends DataQuery { export interface LokiQuery extends DataQuery {