Chore: Update prometheus, loki, graphite and influx plugins to support contextual logs. (#57708)

This commit is contained in:
Yuriy Tseretyan
2022-10-27 12:05:06 -04:00
committed by GitHub
parent 9aac0d32f9
commit facf2b1ee8
21 changed files with 149 additions and 142 deletions

View File

@@ -8,9 +8,10 @@ import (
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/stretchr/testify/require"
)
type mockedRoundTripperForOauth struct {
@@ -122,7 +123,7 @@ func TestOauthForwardIdentity(t *testing.T) {
tracer := tracing.InitializeTracerForTest()
data, err := queryData(context.Background(), &req, &dsInfo, log.New("testlog"), tracer)
data, err := queryData(context.Background(), &req, &dsInfo, tracer)
// we do a basic check that the result is OK
require.NoError(t, err)
require.Len(t, data.Responses, 1)

View File

@@ -13,17 +13,19 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"go.opentelemetry.io/otel/attribute"
)
var logger = log.New("tsdb.loki")
type Service struct {
im instancemgmt.InstanceManager
features featuremgmt.FeatureToggles
plog log.Logger
tracer tracing.Tracer
}
@@ -37,7 +39,6 @@ func ProvideService(httpClientProvider httpclient.Provider, features featuremgmt
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
features: features,
plog: log.New("tsdb.loki"),
tracer: tracer,
}
}
@@ -115,8 +116,7 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
if err != nil {
return err
}
return callResource(ctx, req, sender, dsInfo, s.plog)
return callResource(ctx, req, sender, dsInfo, logger.FromContext(ctx))
}
func getAuthHeadersForCallResource(headers map[string][]string) map[string]string {
@@ -170,13 +170,13 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return result, err
}
return queryData(ctx, req, dsInfo, s.plog, s.tracer)
return queryData(ctx, req, dsInfo, s.tracer)
}
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, plog log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, req.Headers)
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, logger.FromContext(ctx), req.Headers)
queries, err := parseQuery(req)
if err != nil {
@@ -184,13 +184,15 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
_, span := tracer.Start(ctx, "alerting.loki")
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
defer span.End()
logger := logger.FromContext(ctx) // get logger with trace-id and other contextual info
logger.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
frames, err := runQuery(ctx, api, query)
queryRes := backend.DataResponse{}

View File

@@ -13,6 +13,7 @@ import (
"github.com/gorilla/websocket"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/featuremgmt"
)
@@ -74,6 +75,7 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
return fmt.Errorf("missing expr in cuannel")
}
logger := logger.FromContext(ctx)
count := int64(0)
interrupt := make(chan os.Signal, 1)
@@ -99,10 +101,10 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
}
wsurl.RawQuery = params.Encode()
s.plog.Info("connecting to websocket", "url", wsurl)
logger.Info("connecting to websocket", "url", wsurl)
c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil)
if err != nil {
s.plog.Error("error connecting to websocket", "err", err)
logger.Error("error connecting to websocket", "err", err)
return fmt.Errorf("error connecting to websocket")
}
@@ -114,7 +116,7 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
_ = r.Body.Close()
}
err = c.Close()
s.plog.Error("closing loki websocket", "err", err)
logger.Error("closing loki websocket", "err", err)
}()
prev := data.FrameJSONCache{}
@@ -126,7 +128,7 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
for {
_, message, err := c.ReadMessage()
if err != nil {
s.plog.Error("websocket read:", "err", err)
logger.Error("websocket read:", "err", err)
return
}
@@ -153,7 +155,7 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
}
if err != nil {
s.plog.Error("websocket write:", "err", err, "raw", message)
logger.Error("websocket write:", "err", err, "raw", message)
return
}
}
@@ -165,14 +167,14 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
for {
select {
case <-done:
s.plog.Info("socket done")
logger.Info("socket done")
return nil
case <-ctx.Done():
s.plog.Info("stop streaming (context canceled)")
logger.Info("stop streaming (context canceled)")
return nil
case t := <-ticker.C:
count++
s.plog.Error("loki websocket ping?", "time", t, "count", count)
logger.Error("loki websocket ping?", "time", t, "count", count)
}
}
}