mirror of
https://github.com/grafana/grafana.git
synced 2024-11-28 19:54:10 -06:00
473898e47c
* Move some thema code inside grafana * Use new codegen instead of thema for core kinds * Replace TS generator * Use new generator for go types * Remove thema from oapi generator * Remove thema from generators * Don't use kindsys/thema for core kinds * Remove kindsys/thema from plugins * Remove last thema related * Remove most of cuectx and move utils_ts into codegen. It also deletes wire dependency * Merge plugins generators * Delete thema dependency 🎉 * Fix CODEOWNERS * Fix package name * Fix TS output names * More path fixes * Fix mod codeowners * Use original plugin's name * Remove kindsys dependency 🎉 * Modify oapi schema and create an apply function to fix elasticsearch errors * cue.mod was deleted by mistake * Fix TS panels * sort imports * Fixing elasticsearch output * Downgrade oapi-codegen library * Update output ts files * More fixes * Restore old elasticsearch generated file and skip its generation. Remove core imports into plugins * More lint fixes * Add codeowners * restore embed.go file * Fix embed.go
175 lines
4.0 KiB
Go
175 lines
4.0 KiB
Go
package loki
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
)
|
|
|
|
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
|
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
|
if err != nil {
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: backend.SubscribeStreamStatusNotFound,
|
|
}, err
|
|
}
|
|
|
|
// Expect tail/${key}
|
|
if !strings.HasPrefix(req.Path, "tail/") {
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: backend.SubscribeStreamStatusNotFound,
|
|
}, fmt.Errorf("expected tail in channel path")
|
|
}
|
|
|
|
query, err := parseQueryModel(req.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if query.Expr != nil {
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: backend.SubscribeStreamStatusNotFound,
|
|
}, fmt.Errorf("missing expr in channel (subscribe)")
|
|
}
|
|
|
|
dsInfo.streamsMu.RLock()
|
|
defer dsInfo.streamsMu.RUnlock()
|
|
|
|
cache, ok := dsInfo.streams[req.Path]
|
|
if ok {
|
|
msg, err := backend.NewInitialData(cache.Bytes(data.IncludeAll))
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: backend.SubscribeStreamStatusOK,
|
|
InitialData: msg,
|
|
}, err
|
|
}
|
|
|
|
// nothing yet
|
|
return &backend.SubscribeStreamResponse{
|
|
Status: backend.SubscribeStreamStatusOK,
|
|
}, err
|
|
}
|
|
|
|
// Single instance for each channel (results are shared with all listeners)
|
|
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
|
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query, err := parseQueryModel(req.Data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query.Expr != nil {
|
|
return fmt.Errorf("missing expr in cuannel")
|
|
}
|
|
|
|
logger := s.logger.FromContext(ctx)
|
|
count := int64(0)
|
|
|
|
interrupt := make(chan os.Signal, 1)
|
|
signal.Notify(interrupt, os.Interrupt)
|
|
|
|
params := url.Values{}
|
|
params.Add("query", *query.Expr)
|
|
|
|
wsurl, _ := url.Parse(dsInfo.URL)
|
|
|
|
wsurl.Path = "/loki/api/v2alpha/tail"
|
|
|
|
if wsurl.Scheme == "https" {
|
|
wsurl.Scheme = "wss"
|
|
} else {
|
|
wsurl.Scheme = "ws"
|
|
}
|
|
wsurl.RawQuery = params.Encode()
|
|
|
|
logger.Info("Connecting to websocket", "url", wsurl)
|
|
c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil)
|
|
if err != nil {
|
|
logger.Error("Error connecting to websocket", "err", err)
|
|
return fmt.Errorf("error connecting to websocket")
|
|
}
|
|
|
|
defer func() {
|
|
dsInfo.streamsMu.Lock()
|
|
delete(dsInfo.streams, req.Path)
|
|
dsInfo.streamsMu.Unlock()
|
|
if r != nil {
|
|
_ = r.Body.Close()
|
|
}
|
|
err = c.Close()
|
|
logger.Error("Closing loki websocket", "err", err)
|
|
}()
|
|
|
|
prev := data.FrameJSONCache{}
|
|
|
|
// Read all messages
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
for {
|
|
_, message, err := c.ReadMessage()
|
|
if err != nil {
|
|
logger.Error("Websocket read:", "err", err)
|
|
return
|
|
}
|
|
|
|
frame := &data.Frame{}
|
|
err = json.Unmarshal(message, &frame)
|
|
|
|
if err == nil && frame != nil {
|
|
next, _ := data.FrameToJSONCache(frame)
|
|
if next.SameSchema(&prev) {
|
|
err = sender.SendBytes(next.Bytes(data.IncludeDataOnly))
|
|
} else {
|
|
err = sender.SendFrame(frame, data.IncludeAll)
|
|
}
|
|
prev = next
|
|
|
|
// Cache the initial data
|
|
dsInfo.streamsMu.Lock()
|
|
dsInfo.streams[req.Path] = prev
|
|
dsInfo.streamsMu.Unlock()
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Error("Websocket write:", "err", err, "raw", message)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
ticker := time.NewTicker(time.Second * 60) // .Step)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
logger.Info("Socket done")
|
|
return nil
|
|
case <-ctx.Done():
|
|
logger.Info("Stop streaming (context canceled)")
|
|
return nil
|
|
case t := <-ticker.C:
|
|
count++
|
|
logger.Error("Loki websocket ping?", "time", t, "count", count)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
|
return &backend.PublishStreamResponse{
|
|
Status: backend.PublishStreamStatusPermissionDenied,
|
|
}, nil
|
|
}
|