grafana/pkg/tsdb/loki/streaming.go
2022-03-01 14:46:52 -08:00

201 lines
4.6 KiB
Go

package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, err
}
// Expect tail/${key}
if !strings.HasPrefix(req.Path, "tail/") {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("expected tail in channel path")
}
query, err := parseQueryModel(req.Data)
if err != nil {
return nil, err
}
if query.Expr == "" {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("missing expr in channel (subscribe)")
}
dsInfo.streamsMu.RLock()
defer dsInfo.streamsMu.RUnlock()
cache, ok := dsInfo.streams[req.Path]
if ok {
msg, err := backend.NewInitialData(cache.Bytes(data.IncludeAll))
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
InitialData: msg,
}, err
}
// nothing yet
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, err
}
// Single instance for each channel (results are shared with all listeners)
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return err
}
query, err := parseQueryModel(req.Data)
if err != nil {
return err
}
if query.Expr == "" {
return fmt.Errorf("missing expr in cuannel")
}
count := int64(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
params := url.Values{}
params.Add("query", query.Expr)
isV1 := false
wsurl, _ := url.Parse(dsInfo.URL)
// Check if the v2alpha endpoint exists
wsurl.Path = "/loki/api/v2alpha/tail"
if !is400(dsInfo.HTTPClient, wsurl) {
isV1 = true
wsurl.Path = "/loki/api/v1/tail"
}
if wsurl.Scheme == "https" {
wsurl.Scheme = "wss"
} else {
wsurl.Scheme = "ws"
}
wsurl.RawQuery = params.Encode()
s.plog.Info("connecting to websocket", "url", wsurl)
c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil)
if err != nil {
s.plog.Error("error connecting to websocket", "err", err)
return fmt.Errorf("error connecting to websocket")
}
defer func() {
dsInfo.streamsMu.Lock()
delete(dsInfo.streams, req.Path)
dsInfo.streamsMu.Unlock()
if r != nil {
_ = r.Body.Close()
}
err = c.Close()
s.plog.Error("closing loki websocket", "err", err)
}()
prev := data.FrameJSONCache{}
// Read all messages
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
s.plog.Error("websocket read:", "err", err)
return
}
frame := &data.Frame{}
if isV1 {
frame, err = lokiBytesToLabeledFrame(message)
} else {
err = json.Unmarshal(message, &frame)
}
if err == nil && frame != nil {
next, _ := data.FrameToJSONCache(frame)
if next.SameSchema(&prev) {
err = sender.SendBytes(next.Bytes(data.IncludeDataOnly))
} else {
err = sender.SendFrame(frame, data.IncludeAll)
}
prev = next
// Cache the initial data
dsInfo.streamsMu.Lock()
dsInfo.streams[req.Path] = prev
dsInfo.streamsMu.Unlock()
}
if err != nil {
s.plog.Error("websocket write:", "err", err, "raw", message)
return
}
}
}()
ticker := time.NewTicker(time.Second * 60) //.Step)
defer ticker.Stop()
for {
select {
case <-done:
s.plog.Info("socket done")
return nil
case <-ctx.Done():
s.plog.Info("stop streaming (context canceled)")
return nil
case t := <-ticker.C:
count++
s.plog.Error("loki websocket ping?", "time", t, "count", count)
}
}
}
func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}
// if the v2 endpoint exists it will give a 400 rather than 404/500
func is400(client *http.Client, url *url.URL) bool {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return false
}
rsp, err := client.Do(req)
if err != nil {
return false
}
defer func() {
_ = rsp.Body.Close()
}()
return rsp.StatusCode == 400 // will be true
}