diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 8b75df36d37..33708fe1626 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -36,6 +36,7 @@ export interface FeatureToggles { showFeatureFlagsInUI?: boolean; publicDashboards?: boolean; lokiLive?: boolean; + lokiDataframeApi?: boolean; swaggerUi?: boolean; featureHighlights?: boolean; dashboardComments?: boolean; diff --git a/pkg/plugins/manager/manager_integration_test.go b/pkg/plugins/manager/manager_integration_test.go index a4fc6f3335a..3aaade7aa19 100644 --- a/pkg/plugins/manager/manager_integration_test.go +++ b/pkg/plugins/manager/manager_integration_test.go @@ -79,7 +79,7 @@ func TestPluginManager_int_init(t *testing.T) { es := elasticsearch.ProvideService(hcp) grap := graphite.ProvideService(hcp, tracer) idb := influxdb.ProvideService(hcp) - lk := loki.ProvideService(hcp, tracer) + lk := loki.ProvideService(hcp, features, tracer) otsdb := opentsdb.ProvideService(hcp) pr := prometheus.ProvideService(hcp, cfg, features, tracer) tmpo := tempo.ProvideService(hcp) diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index cbd439bcf74..620282f855d 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -117,6 +117,11 @@ var ( Description: "support websocket streaming for loki (early prototype)", State: FeatureStateAlpha, }, + { + Name: "lokiDataframeApi", + Description: "use experimental loki api for websocket streaming (early prototype)", + State: FeatureStateAlpha, + }, { Name: "swaggerUi", Description: "Serves swagger UI", diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 9a530e78b6a..7e8fca720d2 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -87,6 +87,10 @@ const ( // support websocket streaming for loki (early prototype) FlagLokiLive = "lokiLive" + // FlagLokiDataframeApi + // use experimental loki api for websocket streaming (early prototype) + FlagLokiDataframeApi = "lokiDataframeApi" + // FlagSwaggerUi // Serves swagger UI FlagSwaggerUi = "swaggerUi" diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index 35a09e296c7..fb69e6cc11b 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -16,13 +16,15 @@ import ( "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "go.opentelemetry.io/otel/attribute" ) type Service struct { - im instancemgmt.InstanceManager - plog log.Logger - tracer tracing.Tracer + im instancemgmt.InstanceManager + features featuremgmt.FeatureToggles + plog log.Logger + tracer tracing.Tracer } var ( @@ -31,11 +33,12 @@ var ( _ backend.CallResourceHandler = (*Service)(nil) ) -func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service { +func ProvideService(httpClientProvider httpclient.Provider, features featuremgmt.FeatureToggles, tracer tracing.Tracer) *Service { return &Service{ - im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), - plog: log.New("tsdb.loki"), - tracer: tracer, + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), + features: features, + plog: log.New("tsdb.loki"), + tracer: tracer, } } diff --git a/pkg/tsdb/loki/streaming.go b/pkg/tsdb/loki/streaming.go index e2c6ff96180..078c08b3a99 100644 --- a/pkg/tsdb/loki/streaming.go +++ b/pkg/tsdb/loki/streaming.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "net/url" "os" "os/signal" @@ -14,6 +13,7 @@ import ( "github.com/gorilla/websocket" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/featuremgmt" ) func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { @@ -82,13 +82,13 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, params := url.Values{} params.Add("query", query.Expr) - isV1 := false + lokiDataframeApi := s.features.IsEnabled(featuremgmt.FlagLokiDataframeApi) + wsurl, _ := url.Parse(dsInfo.URL) - // Check if the v2alpha endpoint exists - wsurl.Path = "/loki/api/v2alpha/tail" - if !is400(dsInfo.HTTPClient, wsurl) { - isV1 = true + if lokiDataframeApi { + wsurl.Path = "/loki/api/v2alpha/tail" + } else { wsurl.Path = "/loki/api/v1/tail" } @@ -131,7 +131,7 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, } frame := &data.Frame{} - if isV1 { + if !lokiDataframeApi { frame, err = lokiBytesToLabeledFrame(message) } else { err = json.Unmarshal(message, &frame) @@ -182,19 +182,3 @@ func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamReque Status: backend.PublishStreamStatusPermissionDenied, }, nil } - -// if the v2 endpoint exists it will give a 400 rather than 404/500 -func is400(client *http.Client, url *url.URL) bool { - req, err := http.NewRequest("GET", url.String(), nil) - if err != nil { - return false - } - rsp, err := client.Do(req) - if err != nil { - return false - } - defer func() { - _ = rsp.Body.Close() - }() - return rsp.StatusCode == 400 // will be true -}