InfluxDB: move datasource health check to backend (#52668)

* Move datasource health check to backend

* Introduce healthcheck unit tests

* Remove unused method
This commit is contained in:
ismail simsek 2022-07-28 12:06:09 +02:00 committed by GitHub
parent 532741a8fe
commit 2f9636e698
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 313 additions and 93 deletions

View File

@ -7296,9 +7296,9 @@ exports[`better eslint`] = {
[0, 0, 0, "Unexpected any. Specify a different type.", "21"],
[0, 0, 0, "Unexpected any. Specify a different type.", "22"],
[0, 0, 0, "Do not use any type assertions.", "23"],
[0, 0, 0, "Do not use any type assertions.", "24"],
[0, 0, 0, "Unexpected any. Specify a different type.", "24"],
[0, 0, 0, "Unexpected any. Specify a different type.", "25"],
[0, 0, 0, "Do not use any type assertions.", "26"],
[0, 0, 0, "Unexpected any. Specify a different type.", "26"],
[0, 0, 0, "Unexpected any. Specify a different type.", "27"],
[0, 0, 0, "Unexpected any. Specify a different type.", "28"],
[0, 0, 0, "Unexpected any. Specify a different type.", "29"],
@ -7306,13 +7306,7 @@ exports[`better eslint`] = {
[0, 0, 0, "Unexpected any. Specify a different type.", "31"],
[0, 0, 0, "Unexpected any. Specify a different type.", "32"],
[0, 0, 0, "Unexpected any. Specify a different type.", "33"],
[0, 0, 0, "Unexpected any. Specify a different type.", "34"],
[0, 0, 0, "Unexpected any. Specify a different type.", "35"],
[0, 0, 0, "Unexpected any. Specify a different type.", "36"],
[0, 0, 0, "Unexpected any. Specify a different type.", "37"],
[0, 0, 0, "Unexpected any. Specify a different type.", "38"],
[0, 0, 0, "Unexpected any. Specify a different type.", "39"],
[0, 0, 0, "Unexpected any. Specify a different type.", "40"]
[0, 0, 0, "Unexpected any. Specify a different type.", "34"]
],
"public/app/plugins/datasource/influxdb/influx_query_model.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],

View File

@ -0,0 +1,126 @@
package influxdb
import (
"context"
"errors"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
const (
refID = "healthcheck"
)
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult,
error) {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return getHealthCheckMessage(s, "error getting datasource info", err)
}
if dsInfo == nil {
return getHealthCheckMessage(s, "", errors.New("invalid datasource info received"))
}
switch dsInfo.Version {
case influxVersionFlux:
return CheckFluxHealth(ctx, dsInfo, s, req)
case influxVersionInfluxQL:
return CheckInfluxQLHealth(ctx, dsInfo, s)
default:
return getHealthCheckMessage(s, "", errors.New("unknown influx version"))
}
}
func CheckFluxHealth(ctx context.Context, dsInfo *models.DatasourceInfo, s *Service,
req *backend.CheckHealthRequest) (*backend.CheckHealthResult,
error) {
ds, err := flux.Query(ctx, dsInfo, backend.QueryDataRequest{
PluginContext: req.PluginContext,
Queries: []backend.DataQuery{
{
RefID: refID,
JSON: []byte(`{ "query": "buckets()" }`),
Interval: 1 * time.Minute,
MaxDataPoints: 423,
TimeRange: backend.TimeRange{
From: time.Now().AddDate(0, 0, -1),
To: time.Now(),
},
},
},
})
if err != nil {
return getHealthCheckMessage(s, "error performing flux query", err)
}
if res, ok := ds.Responses[refID]; ok {
if res.Error != nil {
return getHealthCheckMessage(s, "error reading buckets", res.Error)
}
if len(res.Frames) > 0 && len(res.Frames[0].Fields) > 0 {
return getHealthCheckMessage(s, fmt.Sprintf("%d buckets found", res.Frames[0].Fields[0].Len()), nil)
}
}
return getHealthCheckMessage(s, "", errors.New("error getting flux query buckets"))
}
func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo, s *Service) (*backend.CheckHealthResult, error) {
queryString := "SHOW measurements"
hcRequest, err := s.createRequest(ctx, dsInfo, queryString)
if err != nil {
return getHealthCheckMessage(s, "error creating influxDB healthcheck request", err)
}
res, err := dsInfo.HTTPClient.Do(hcRequest)
if err != nil {
return getHealthCheckMessage(s, "error performing influxQL query", err)
}
defer func() {
if err := res.Body.Close(); err != nil {
s.glog.Warn("failed to close response body", "err", err)
}
}()
if res.StatusCode/100 != 2 {
return getHealthCheckMessage(s, "", fmt.Errorf("error reading InfluxDB. Status Code: %d", res.StatusCode))
}
resp := s.responseParser.Parse(res.Body, []Query{{
RefID: refID,
UseRawQuery: true,
RawQuery: queryString,
}})
if res, ok := resp.Responses[refID]; ok {
if res.Error != nil {
return getHealthCheckMessage(s, "error reading influxDB", res.Error)
}
if len(res.Frames) > 0 && len(res.Frames[0].Fields) > 0 {
return getHealthCheckMessage(s, fmt.Sprintf("%d measurements found", res.Frames[0].Fields[0].Len()), nil)
}
}
return getHealthCheckMessage(s, "", errors.New("error connecting influxDB influxQL"))
}
func getHealthCheckMessage(s *Service, message string, err error) (*backend.CheckHealthResult, error) {
if err == nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusOk,
Message: fmt.Sprintf("datasource is working. %s", message),
}, nil
}
s.glog.Warn("error performing influxdb healthcheck", "err", err.Error())
errorMessage := fmt.Sprintf("%s %s", err.Error(), message)
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: errorMessage,
}, nil
}

View File

@ -0,0 +1,50 @@
package influxdb
import (
"context"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
)
func Test_healthcheck(t *testing.T) {
t.Run("should do successful health check for version flux ", func(t *testing.T) {
s := GetMockService(influxVersionFlux, RoundTripper{
Body: `#datatype,string,long,string,string,string,string,long
#group,false,false,false,false,true,false,false
#default,_result,,,,,,
,result,table,name,id,organizationID,retentionPolicy,retentionPeriod
,,0,_monitoring,effbe6d547e1c085,c678d3a458299f4e,,604800000000000
,,0,_tasks,9ac37d3047b0970c,c678d3a458299f4e,,259200000000000
,,0,mybucket,98184c45c69fc01e,c678d3a458299f4e,,0`,
})
res, err := s.CheckHealth(context.Background(), &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{},
Headers: nil,
})
assert.NoError(t, err)
assert.Equal(t, backend.HealthStatusOk, res.Status)
})
t.Run("should do successful health check for version InfluxQL", func(t *testing.T) {
s := GetMockService(influxVersionInfluxQL, RoundTripper{
Body: `{"results": [{"series": [{"columns": ["name"],"name": "measurements","values": [["cpu"],["disk"],["diskio"],["kernel"],["mem"],["processes"],["swap"],["system"]]}],"statement_id": 0}]}`,
})
res, err := s.CheckHealth(context.Background(), &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{},
Headers: nil,
})
assert.NoError(t, err)
assert.Equal(t, backend.HealthStatusOk, res.Status)
})
t.Run("should fail when version is unknown", func(t *testing.T) {
s := GetMockService("unknown-influx-version", RoundTripper{
Body: `{"results": [{"series": [{"columns": ["name"],"name": "measurements","values": [["cpu"],["disk"],["diskio"],["kernel"],["mem"],["processes"],["swap"],["system"]]}],"statement_id": 0}]}`,
})
res, _ := s.CheckHealth(context.Background(), &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{},
Headers: nil,
})
assert.Equal(t, backend.HealthStatusError, res.Status)
})
}

View File

@ -64,11 +64,15 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
if maxSeries == 0 {
maxSeries = 1000
}
version := jsonData.Version
if version == "" {
version = influxVersionInfluxQL
}
model := &models.DatasourceInfo{
HTTPClient: client,
URL: settings.URL,
Database: settings.Database,
Version: jsonData.Version,
Version: version,
HTTPMode: httpMode,
TimeInterval: jsonData.TimeInterval,
DefaultBucket: jsonData.DefaultBucket,

View File

@ -0,0 +1,122 @@
package influxdb
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/backend"
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
type fakeHttpClientProvider struct {
httpclient.Provider
opts sdkhttpclient.Options
res *http.Response
rt RoundTripper
}
func (p *fakeHttpClientProvider) New(opts ...sdkhttpclient.Options) (*http.Client, error) {
p.opts = opts[0]
c, err := sdkhttpclient.New(opts[0])
if err != nil {
return nil, err
}
c.Transport = p
return c, nil
}
func (p *fakeHttpClientProvider) GetTransport(opts ...sdkhttpclient.Options) (http.RoundTripper, error) {
p.opts = opts[0]
return http.DefaultTransport, nil
}
func (p *fakeHttpClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
return p.rt.RoundTrip(req)
}
type fakeInstance struct {
version string
fakeRoundTripper RoundTripper
}
func (f *fakeInstance) Get(pluginContext backend.PluginContext) (instancemgmt.Instance, error) {
fp := &fakeHttpClientProvider{
opts: sdkhttpclient.Options{
Timeouts: &sdkhttpclient.DefaultTimeoutOptions,
},
res: &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{}`))),
},
rt: f.fakeRoundTripper,
}
client, err := fp.New(sdkhttpclient.Options{})
if err != nil {
return nil, err
}
return &models.DatasourceInfo{
HTTPClient: client,
Token: "sometoken",
URL: "https://awesome-influx.com",
Database: "testdb",
Version: f.version,
HTTPMode: "GET",
TimeInterval: "10s",
DefaultBucket: "testbucket",
Organization: "testorg",
MaxSeries: 2,
}, nil
}
func (f *fakeInstance) Do(pluginContext backend.PluginContext, fn instancemgmt.InstanceCallbackFunc) error {
return nil
}
type RoundTripper struct {
Body string
FileName string // filename (relative path of where it is being called)
}
func (rt *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
res := &http.Response{
StatusCode: http.StatusOK,
Status: "200 OK",
Body: ioutil.NopCloser(bytes.NewBufferString("{}")),
}
if rt.Body != "" {
res.Body = ioutil.NopCloser(bytes.NewBufferString(rt.Body))
}
if rt.FileName != "" {
b, err := ioutil.ReadFile(rt.FileName)
if err != nil {
return res, fmt.Errorf("error reading testdata file %s", rt.FileName)
}
reader := ioutil.NopCloser(bytes.NewReader(b))
res.Body = reader
}
if res.Body != nil {
return res, nil
}
return nil, errors.New("fake client not working as expected. If you got this error fix this method")
}
func GetMockService(version string, rt RoundTripper) *Service {
return &Service{
queryParser: &InfluxdbQueryParser{},
responseParser: &ResponseParser{},
glog: log.New("tsdb.influxdb"),
im: &fakeInstance{
version: version,
fakeRoundTripper: rt,
},
}
}

View File

@ -0,0 +1,6 @@
package influxdb
const (
influxVersionFlux = "Flux"
influxVersionInfluxQL = "InfluxQL"
)

View File

@ -1,7 +1,6 @@
import { cloneDeep, extend, get, groupBy, has, isString, map as _map, omit, pick, reduce } from 'lodash';
import { cloneDeep, extend, groupBy, has, isString, map as _map, omit, pick, reduce } from 'lodash';
import { lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import {
AnnotationEvent,
@ -13,9 +12,7 @@ import {
DataQueryResponse,
DataSourceInstanceSettings,
dateMath,
dateTime,
FieldType,
LoadingState,
MetricFindValue,
QueryResultMeta,
ScopedVars,
@ -555,85 +552,6 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
).join('&');
}
testDatasource() {
if (this.isFlux) {
// TODO: eventually use the real /health endpoint
const request: DataQueryRequest<InfluxQuery> = {
targets: [{ refId: 'test', query: 'buckets()' }],
requestId: `${this.id}-health-${uuidv4()}`,
dashboardId: 0,
panelId: 0,
interval: '1m',
intervalMs: 60000,
maxDataPoints: 423,
range: {
from: dateTime(1000),
to: dateTime(2000),
},
} as DataQueryRequest<InfluxQuery>;
return lastValueFrom(super.query(request))
.then((res: DataQueryResponse) => {
if (!res || !res.data || res.state !== LoadingState.Done) {
console.error('InfluxDB Error', res);
return { status: 'error', message: 'Error reading InfluxDB' };
}
const first = res.data[0];
if (first && first.length) {
return { status: 'success', message: `${first.length} buckets found` };
}
console.error('InfluxDB Error', res);
return { status: 'error', message: 'Error reading buckets' };
})
.catch((err: any) => {
console.error('InfluxDB Error', err);
return { status: 'error', message: err.message };
});
}
if (this.isMigrationToggleOnAndIsAccessProxy()) {
const target: InfluxQuery = {
refId: 'metricFindQuery',
query: 'SHOW TAG KEYS',
rawQuery: true,
};
return lastValueFrom(super.query({ targets: [target] } as DataQueryRequest))
.then((res: DataQueryResponse) => {
if (!res || !res.data || res.state !== LoadingState.Done) {
return {
status: 'error',
message: 'Error reading InfluxDB.',
};
}
if (res.data?.length) {
return { status: 'success', message: 'Data source is working.' };
}
return {
status: 'error',
message: 'Successfully connected to InfluxDB, but no tags found.',
};
})
.catch((err: any) => {
return { status: 'error', message: err.message };
});
}
const queryBuilder = new InfluxQueryBuilder({ measurement: '', tags: [] }, this.database);
const query = queryBuilder.buildExploreQuery('RETENTION POLICIES');
return lastValueFrom(this._seriesQuery(query))
.then((res: any) => {
const error = get(res, 'results[0].error');
if (error) {
return { status: 'error', message: error };
}
return { status: 'success', message: 'Data source is working' };
})
.catch((err: any) => {
return { status: 'error', message: err.message };
});
}
_influxRequest(method: string, url: string, data: any, options?: any) {
const currentUrl = this.urls.shift()!;
this.urls.push(currentUrl);