Merge branch 'master' into alerting_opentsdb

This commit is contained in:
bergquist
2016-10-14 09:17:35 +02:00
175 changed files with 2737 additions and 431 deletions

View File

@@ -252,33 +252,30 @@ func NotificationTest(c *middleware.Context, dto dtos.NotificationTestCommand) R
return ApiSuccess("Test notification sent")
}
func getAlertIdForRequest(c *middleware.Context) (int64, error) {
alertId := c.QueryInt64("alertId")
panelId := c.QueryInt64("panelId")
dashboardId := c.QueryInt64("dashboardId")
if alertId == 0 && dashboardId == 0 && panelId == 0 {
return 0, fmt.Errorf("Missing alertId or dashboardId and panelId")
//POST /api/:alertId/pause
func PauseAlert(c *middleware.Context, dto dtos.PauseAlertCommand) Response {
cmd := models.PauseAlertCommand{
OrgId: c.OrgId,
AlertId: c.ParamsInt64("alertId"),
Paused: dto.Paused,
}
if alertId == 0 {
//fetch alertId
query := models.GetAlertsQuery{
OrgId: c.OrgId,
DashboardId: dashboardId,
PanelId: panelId,
}
if err := bus.Dispatch(&query); err != nil {
return 0, err
}
if len(query.Result) != 1 {
return 0, fmt.Errorf("PanelId is not unique on dashboard")
}
alertId = query.Result[0].Id
if err := bus.Dispatch(&cmd); err != nil {
return ApiError(500, "", err)
}
return alertId, nil
var response models.AlertStateType = models.AlertStateNoData
pausedState := "un paused"
if cmd.Paused {
response = models.AlertStatePaused
pausedState = "paused"
}
result := map[string]interface{}{
"alertId": cmd.AlertId,
"state": response,
"message": "alert " + pausedState,
}
return Json(200, result)
}

View File

@@ -252,6 +252,7 @@ func Register(r *macaron.Macaron) {
r.Group("/alerts", func() {
r.Post("/test", bind(dtos.AlertTestCommand{}), wrap(AlertTest))
r.Post("/:alertId/pause", ValidateOrgAlert, bind(dtos.PauseAlertCommand{}), wrap(PauseAlert))
r.Get("/:alertId", ValidateOrgAlert, wrap(GetAlert))
r.Get("/", wrap(GetAlerts))
r.Get("/states-for-dashboard", wrap(GetAlertStatesForDashboard))

View File

@@ -29,7 +29,7 @@ var customMetricsDimensionsMap map[string]map[string]map[string]*CustomMetricsCa
func init() {
metricsMap = map[string][]string{
"AWS/ApiGateway": {"4XXError", "5XXError", "CacheHitCount", "CacheMissCount", "Count", "IntegrationLatency", "Latency"},
"AWS/ApplicationELB": {"ActiveConnectionCount", "ClientTLSNegotiationErrorCount", "HealthyHostCount", "HTTPCode_ELB_4XX_Count", "HTTPCode_ELB_5XX_Count", "HTTPCode_Target_2XX_Count", "HTTPCode_Target_3XX_Count", "HTTPCode_Target_4XX_Count", "HTTPCode_Target_5XX_Count", "NewConnectionCount", "ProcessedBytes", "RejectedConnectionCount", "RequestCount", "TargetConnectionErrorCount", "TargetResponseTime", "TargetTLSNegotiationErrorCount", "UnhealthyHostCount"},
"AWS/ApplicationELB": {"ActiveConnectionCount", "ClientTLSNegotiationErrorCount", "HealthyHostCount", "HTTPCode_ELB_4XX_Count", "HTTPCode_ELB_5XX_Count", "HTTPCode_Target_2XX_Count", "HTTPCode_Target_3XX_Count", "HTTPCode_Target_4XX_Count", "HTTPCode_Target_5XX_Count", "NewConnectionCount", "ProcessedBytes", "RejectedConnectionCount", "RequestCount", "TargetConnectionErrorCount", "TargetResponseTime", "TargetTLSNegotiationErrorCount", "UnHealthyHostCount"},
"AWS/AutoScaling": {"GroupMinSize", "GroupMaxSize", "GroupDesiredCapacity", "GroupInServiceInstances", "GroupPendingInstances", "GroupStandbyInstances", "GroupTerminatingInstances", "GroupTotalInstances"},
"AWS/Billing": {"EstimatedCharges"},
"AWS/CloudFront": {"Requests", "BytesDownloaded", "BytesUploaded", "TotalErrorRate", "4xxErrorRate", "5xxErrorRate"},

View File

@@ -153,16 +153,14 @@ func PostDashboard(c *middleware.Context, cmd m.SaveDashboardCommand) Response {
return ApiError(500, "Failed to save dashboard", err)
}
if setting.AlertingEnabled {
alertCmd := alerting.UpdateDashboardAlertsCommand{
OrgId: c.OrgId,
UserId: c.UserId,
Dashboard: cmd.Result,
}
alertCmd := alerting.UpdateDashboardAlertsCommand{
OrgId: c.OrgId,
UserId: c.UserId,
Dashboard: cmd.Result,
}
if err := bus.Dispatch(&alertCmd); err != nil {
return ApiError(500, "Failed to save alerts", err)
}
if err := bus.Dispatch(&alertCmd); err != nil {
return ApiError(500, "Failed to save alerts", err)
}
c.TimeRequest(metrics.M_Api_Dashboard_Save)

View File

@@ -58,3 +58,8 @@ type NotificationTestCommand struct {
Type string `json:"type"`
Settings *simplejson.Json `json:"settings"`
}
type PauseAlertCommand struct {
AlertId int64 `json:"alertId"`
Paused bool `json:"paused"`
}

View File

@@ -145,7 +145,6 @@ func getFrontendSettingsMap(c *middleware.Context) (map[string]interface{}, erro
"hasUpdate": plugins.GrafanaHasUpdate,
"env": setting.Env,
},
"alertingEnabled": setting.AlertingEnabled,
}
return jsonObj, nil

View File

@@ -36,6 +36,7 @@ func ReverseProxyGnetReq(proxyPath string) *httputil.ReverseProxy {
// clear cookie headers
req.Header.Del("Cookie")
req.Header.Del("Set-Cookie")
req.Header.Del("Authorization")
}
return &httputil.ReverseProxy{Director: director}

View File

@@ -102,7 +102,7 @@ func setIndexViewData(c *middleware.Context) (*dtos.IndexViewData, error) {
Children: dashboardChildNavs,
})
if setting.AlertingEnabled && (c.OrgRole == m.ROLE_ADMIN || c.OrgRole == m.ROLE_EDITOR) {
if c.OrgRole == m.ROLE_ADMIN || c.OrgRole == m.ROLE_EDITOR {
alertChildNavs := []*dtos.NavLink{
{Text: "Alert List", Url: setting.AppSubUrl + "/alerting/list"},
{Text: "Notifications", Url: setting.AppSubUrl + "/alerting/notifications"},

View File

@@ -1,9 +1,17 @@
package api
import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"github.com/grafana/grafana/pkg/bus"
@@ -14,6 +22,12 @@ import (
"github.com/grafana/grafana/pkg/social"
)
func GenStateString() string {
rnd := make([]byte, 32)
rand.Read(rnd)
return base64.StdEncoding.EncodeToString(rnd)
}
func OAuthLogin(ctx *middleware.Context) {
if setting.OAuthService == nil {
ctx.Handle(404, "login.OAuthLogin(oauth service not enabled)", nil)
@@ -27,14 +41,63 @@ func OAuthLogin(ctx *middleware.Context) {
return
}
error := ctx.Query("error")
if error != "" {
errorDesc := ctx.Query("error_description")
ctx.Logger.Info("OAuthLogin Failed", "error", error, "errorDesc", errorDesc)
ctx.Redirect(setting.AppSubUrl + "/login?failCode=1003")
return
}
code := ctx.Query("code")
if code == "" {
ctx.Redirect(connect.AuthCodeURL("", oauth2.AccessTypeOnline))
state := GenStateString()
ctx.Session.Set(middleware.SESS_KEY_OAUTH_STATE, state)
ctx.Redirect(connect.AuthCodeURL(state, oauth2.AccessTypeOnline))
return
}
// verify state string
savedState := ctx.Session.Get(middleware.SESS_KEY_OAUTH_STATE).(string)
queryState := ctx.Query("state")
if savedState != queryState {
ctx.Handle(500, "login.OAuthLogin(state mismatch)", nil)
return
}
// handle call back
token, err := connect.Exchange(oauth2.NoContext, code)
// initialize oauth2 context
oauthCtx := oauth2.NoContext
if setting.OAuthService.OAuthInfos[name].TlsClientCert != "" {
cert, err := tls.LoadX509KeyPair(setting.OAuthService.OAuthInfos[name].TlsClientCert, setting.OAuthService.OAuthInfos[name].TlsClientKey)
if err != nil {
log.Fatal(err)
}
// Load CA cert
caCert, err := ioutil.ReadFile(setting.OAuthService.OAuthInfos[name].TlsClientCa)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
},
}
sslcli := &http.Client{Transport: tr}
oauthCtx = context.TODO()
oauthCtx = context.WithValue(oauthCtx, oauth2.HTTPClient, sslcli)
}
// get token from provider
token, err := connect.Exchange(oauthCtx, code)
if err != nil {
ctx.Handle(500, "login.OAuthLogin(NewTransportWithCode)", err)
return
@@ -42,7 +105,11 @@ func OAuthLogin(ctx *middleware.Context) {
ctx.Logger.Debug("OAuthLogin Got token")
userInfo, err := connect.UserInfo(token)
// set up oauth2 client
client := connect.Client(oauthCtx, token)
// get user info
userInfo, err := connect.UserInfo(client)
if err != nil {
if err == social.ErrMissingTeamMembership {
ctx.Redirect(setting.AppSubUrl + "/login?failCode=1000")
@@ -82,7 +149,7 @@ func OAuthLogin(ctx *middleware.Context) {
return
}
cmd := m.CreateUserCommand{
Login: userInfo.Email,
Login: userInfo.Login,
Email: userInfo.Email,
Name: userInfo.Name,
Company: userInfo.Company,

View File

@@ -149,6 +149,9 @@ func createRequest(repoUrl string, subPaths ...string) ([]byte, error) {
}
res, err := HttpClient.Do(req)
if res.StatusCode/100 != 2 {
return []byte{}, fmt.Errorf("Api returned invalid status: %s", res.Status)
}
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()

View File

@@ -20,6 +20,7 @@ import (
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
_ "github.com/grafana/grafana/pkg/tsdb/opentsdb"
_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
_ "github.com/grafana/grafana/pkg/tsdb/testdata"

View File

@@ -59,7 +59,7 @@ func (g *GrafanaServerImpl) Start() {
plugins.Init()
// init alerting
if setting.AlertingEnabled {
if setting.ExecuteAlerts {
engine := alerting.NewEngine()
g.childRoutines.Go(func() error { return engine.Run(g.context) })
}

View File

@@ -10,6 +10,13 @@ type ImageUploader interface {
Upload(path string) (string, error)
}
type NopImageUploader struct {
}
func (NopImageUploader) Upload(path string) (string, error) {
return "", nil
}
func NewImageUploader() (ImageUploader, error) {
switch setting.ImageUploadProvider {
@@ -53,5 +60,5 @@ func NewImageUploader() (ImageUploader, error) {
return NewWebdavImageUploader(url, username, password)
}
return nil, fmt.Errorf("could not find specified provider")
return NopImageUploader{}, nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
type ldapAuther struct {
@@ -29,7 +30,7 @@ func (a *ldapAuther) Dial() error {
var err error
var certPool *x509.CertPool
if a.server.RootCACert != "" {
certPool := x509.NewCertPool()
certPool = x509.NewCertPool()
for _, caCertFile := range strings.Split(a.server.RootCACert, " ") {
if pem, err := ioutil.ReadFile(caCertFile); err != nil {
return err
@@ -132,8 +133,10 @@ func (a *ldapAuther) getGrafanaUserFor(ldapUser *ldapUserInfo) (*m.User, error)
// get user from grafana db
userQuery := m.GetUserByLoginQuery{LoginOrEmail: ldapUser.Username}
if err := bus.Dispatch(&userQuery); err != nil {
if err == m.ErrUserNotFound {
if err == m.ErrUserNotFound && setting.LdapAllowSignup {
return a.createGrafanaUser(ldapUser)
} else if err == m.ErrUserNotFound {
return nil, ErrInvalidCredentials
} else {
return nil, err
}

View File

@@ -13,6 +13,7 @@ import (
const (
SESS_KEY_USERID = "uid"
SESS_KEY_OAUTH_STATE = "state"
)
var sessionManager *session.Manager

View File

@@ -101,6 +101,12 @@ type SaveAlertsCommand struct {
Alerts []*Alert
}
type PauseAlertCommand struct {
OrgId int64
AlertId int64
Paused bool
}
type SetAlertStateCommand struct {
AlertId int64
OrgId int64

View File

@@ -28,7 +28,7 @@ type ThresholdEvaluator struct {
Threshold float64
}
func newThresholdEvaludator(typ string, model *simplejson.Json) (*ThresholdEvaluator, error) {
func newThresholdEvaluator(typ string, model *simplejson.Json) (*ThresholdEvaluator, error) {
params := model.Get("params").MustArray()
if len(params) == 0 {
return nil, alerting.ValidationError{Reason: "Evaluator missing threshold parameter"}
@@ -111,7 +111,7 @@ func NewAlertEvaluator(model *simplejson.Json) (AlertEvaluator, error) {
}
if inSlice(typ, defaultTypes) {
return newThresholdEvaludator(typ, model)
return newThresholdEvaluator(typ, model)
}
if inSlice(typ, rangedTypes) {
@@ -122,7 +122,7 @@ func NewAlertEvaluator(model *simplejson.Json) (AlertEvaluator, error) {
return &NoDataEvaluator{}, nil
}
return nil, alerting.ValidationError{Reason: "Evaludator invalid evaluator type"}
return nil, alerting.ValidationError{Reason: "Evaluator invalid evaluator type: " + typ}
}
func inSlice(a string, list []string) bool {

View File

@@ -82,7 +82,7 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *
req := c.getRequestForAlertRule(getDsInfo.Result, timeRange)
result := make(tsdb.TimeSeriesSlice, 0)
resp, err := c.HandleRequest(context.Context, req)
resp, err := c.HandleRequest(context.Ctx, req)
if err != nil {
return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
}
@@ -123,6 +123,7 @@ func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRa
BasicAuth: datasource.BasicAuth,
BasicAuthUser: datasource.BasicAuthUser,
BasicAuthPassword: datasource.BasicAuthPassword,
JsonData: datasource.JsonData,
},
},
},

View File

@@ -3,6 +3,8 @@ package conditions
import (
"testing"
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
@@ -43,7 +45,7 @@ func testReducer(typ string, datapoints ...float64) float64 {
}
for idx := range datapoints {
series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134))
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(datapoints[idx]), 1234134))
}
return reducer.Reduce(series).Float64

View File

@@ -28,23 +28,7 @@ type EvalContext struct {
NoDataFound bool
RetryCount int
Context context.Context
}
func (evalContext *EvalContext) Deadline() (deadline time.Time, ok bool) {
return evalContext.Deadline()
}
func (evalContext *EvalContext) Done() <-chan struct{} {
return evalContext.Context.Done()
}
func (evalContext *EvalContext) Err() error {
return evalContext.Context.Err()
}
func (evalContext *EvalContext) Value(key interface{}) interface{} {
return evalContext.Context.Value(key)
Ctx context.Context
}
type StateDescription struct {
@@ -103,6 +87,10 @@ func (c *EvalContext) GetDashboardSlug() (string, error) {
}
func (c *EvalContext) GetRuleUrl() (string, error) {
if c.IsTestRun {
return setting.AppUrl, nil
}
if slug, err := c.GetDashboardSlug(); err != nil {
return "", err
} else {
@@ -113,7 +101,7 @@ func (c *EvalContext) GetRuleUrl() (string, error) {
func NewEvalContext(alertCtx context.Context, rule *Rule) *EvalContext {
return &EvalContext{
Context: alertCtx,
Ctx: alertCtx,
StartTime: time.Now(),
Rule: rule,
Logs: make([]*ResultLogEntry, 0),

View File

@@ -80,6 +80,11 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
continue
}
frequency, err := getTimeDurationStringToSeconds(jsonAlert.Get("frequency").MustString())
if err != nil {
return nil, ValidationError{Reason: "Could not parse frequency"}
}
alert := &m.Alert{
DashboardId: e.Dash.Id,
OrgId: e.OrgId,
@@ -88,7 +93,7 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
Name: jsonAlert.Get("name").MustString(),
Handler: jsonAlert.Get("handler").MustInt64(),
Message: jsonAlert.Get("message").MustString(),
Frequency: getTimeDurationStringToSeconds(jsonAlert.Get("frequency").MustString()),
Frequency: frequency,
}
for _, condition := range jsonAlert.Get("conditions").MustArray() {
@@ -115,13 +120,17 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
jsonQuery.SetPath([]string{"datasourceId"}, datasource.Id)
}
if interval, err := panel.Get("interval").String(); err == nil {
panelQuery.Set("interval", interval)
}
jsonQuery.Set("model", panelQuery.Interface())
}
alert.Settings = jsonAlert
// validate
_, err := NewRuleFromDBAlert(alert)
_, err = NewRuleFromDBAlert(alert)
if err == nil && alert.ValidToSave() {
alerts = append(alerts, alert)
} else {

View File

@@ -6,6 +6,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
. "github.com/smartystreets/goconvey/convey"
)
@@ -17,8 +18,35 @@ func TestAlertRuleExtraction(t *testing.T) {
return &FakeCondition{}, nil
})
Convey("Parsing and validating alerts from dashboards", func() {
json := `{
setting.NewConfigContext(&setting.CommandLineArgs{
HomePath: "../../../",
})
// mock data
defaultDs := &m.DataSource{Id: 12, OrgId: 1, Name: "I am default", IsDefault: true}
graphite2Ds := &m.DataSource{Id: 15, OrgId: 1, Name: "graphite2"}
influxDBDs := &m.DataSource{Id: 16, OrgId: 1, Name: "InfluxDB"}
bus.AddHandler("test", func(query *m.GetDataSourcesQuery) error {
query.Result = []*m.DataSource{defaultDs, graphite2Ds}
return nil
})
bus.AddHandler("test", func(query *m.GetDataSourceByNameQuery) error {
if query.Name == defaultDs.Name {
query.Result = defaultDs
}
if query.Name == graphite2Ds.Name {
query.Result = graphite2Ds
}
if query.Name == influxDBDs.Name {
query.Result = influxDBDs
}
return nil
})
json := `
{
"id": 57,
"title": "Graphite 4",
"originalTitle": "Graphite 4",
@@ -80,32 +108,16 @@ func TestAlertRuleExtraction(t *testing.T) {
]
}
]
}`
}`
Convey("Parsing and validating dashboard containing graphite alerts", func() {
dashJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
dash := m.NewDashboardFromJson(dashJson)
extractor := NewDashAlertExtractor(dash, 1)
// mock data
defaultDs := &m.DataSource{Id: 12, OrgId: 2, Name: "I am default", IsDefault: true}
graphite2Ds := &m.DataSource{Id: 15, OrgId: 2, Name: "graphite2"}
bus.AddHandler("test", func(query *m.GetDataSourcesQuery) error {
query.Result = []*m.DataSource{defaultDs, graphite2Ds}
return nil
})
bus.AddHandler("test", func(query *m.GetDataSourceByNameQuery) error {
if query.Name == defaultDs.Name {
query.Result = defaultDs
}
if query.Name == graphite2Ds.Name {
query.Result = graphite2Ds
}
return nil
})
alerts, err := extractor.GetAlerts()
Convey("Get rules without error", func() {
@@ -119,6 +131,9 @@ func TestAlertRuleExtraction(t *testing.T) {
So(v.DashboardId, ShouldEqual, 57)
So(v.Name, ShouldNotBeEmpty)
So(v.Message, ShouldNotBeEmpty)
settings := simplejson.NewFromAny(v.Settings)
So(settings.Get("interval").MustString(""), ShouldEqual, "")
}
Convey("should extract handler property", func() {
@@ -156,5 +171,317 @@ func TestAlertRuleExtraction(t *testing.T) {
})
})
})
Convey("Parse and validate dashboard containing influxdb alert", func() {
json2 := `{
"id": 4,
"title": "Influxdb",
"tags": [
"apa"
],
"style": "dark",
"timezone": "browser",
"editable": true,
"hideControls": false,
"sharedCrosshair": false,
"rows": [
{
"collapse": false,
"editable": true,
"height": "450px",
"panels": [
{
"alert": {
"conditions": [
{
"evaluator": {
"params": [
10
],
"type": "gt"
},
"query": {
"params": [
"B",
"5m",
"now"
]
},
"reducer": {
"params": [],
"type": "avg"
},
"type": "query"
}
],
"frequency": "3s",
"handler": 1,
"name": "Influxdb",
"noDataState": "no_data",
"notifications": [
{
"id": 6
}
]
},
"alerting": {},
"aliasColors": {
"logins.count.count": "#890F02"
},
"bars": false,
"datasource": "InfluxDB",
"editable": true,
"error": false,
"fill": 1,
"grid": {},
"id": 1,
"interval": ">10s",
"isNew": true,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"span": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"dsType": "influxdb",
"groupBy": [
{
"params": [
"$interval"
],
"type": "time"
},
{
"params": [
"datacenter"
],
"type": "tag"
},
{
"params": [
"none"
],
"type": "fill"
}
],
"hide": false,
"measurement": "logins.count",
"policy": "default",
"query": "SELECT 8 * count(\"value\") FROM \"logins.count\" WHERE $timeFilter GROUP BY time($interval), \"datacenter\" fill(none)",
"rawQuery": true,
"refId": "B",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"value"
],
"type": "field"
},
{
"params": [],
"type": "count"
}
]
],
"tags": []
},
{
"dsType": "influxdb",
"groupBy": [
{
"params": [
"$interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"hide": true,
"measurement": "cpu",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"value"
],
"type": "field"
},
{
"params": [],
"type": "mean"
}
],
[
{
"params": [
"value"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": []
}
],
"thresholds": [
{
"colorMode": "critical",
"fill": true,
"line": true,
"op": "gt",
"value": 10
}
],
"timeFrom": null,
"timeShift": null,
"title": "Panel Title",
"tooltip": {
"msResolution": false,
"ordering": "alphabetical",
"shared": true,
"sort": 0,
"value_type": "cumulative"
},
"type": "graph",
"xaxis": {
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"logBase": 1,
"max": null,
"min": null,
"show": true
}
]
},
{
"editable": true,
"error": false,
"id": 2,
"isNew": true,
"limit": 10,
"links": [],
"show": "current",
"span": 2,
"stateFilter": [
"alerting"
],
"title": "Alert status",
"type": "alertlist"
}
],
"title": "Row"
}
],
"time": {
"from": "now-5m",
"to": "now"
},
"timepicker": {
"now": true,
"refresh_intervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"time_options": [
"5m",
"15m",
"1h",
"6h",
"12h",
"24h",
"2d",
"7d",
"30d"
]
},
"templating": {
"list": []
},
"annotations": {
"list": []
},
"schemaVersion": 13,
"version": 120,
"links": [],
"gnetId": null
}`
dashJson, err := simplejson.NewJson([]byte(json2))
So(err, ShouldBeNil)
dash := m.NewDashboardFromJson(dashJson)
extractor := NewDashAlertExtractor(dash, 1)
alerts, err := extractor.GetAlerts()
Convey("Get rules without error", func() {
So(err, ShouldBeNil)
})
Convey("should be able to read interval", func() {
So(len(alerts), ShouldEqual, 1)
for _, alert := range alerts {
So(alert.DashboardId, ShouldEqual, 4)
conditions := alert.Settings.Get("conditions").MustArray()
cond := simplejson.NewFromAny(conditions[0])
So(cond.Get("query").Get("model").Get("interval").MustString(), ShouldEqual, ">10s")
}
})
})
})
}

View File

@@ -16,6 +16,9 @@ type Notifier interface {
GetType() string
NeedsImage() bool
PassesFilter(rule *Rule) bool
GetNotifierId() int64
GetIsDefault() bool
}
type Condition interface {

View File

@@ -35,14 +35,22 @@ func (n *RootNotifier) PassesFilter(rule *Rule) bool {
return false
}
func (n *RootNotifier) Notify(context *EvalContext) error {
n.log.Info("Sending notifications for", "ruleId", context.Rule.Id)
func (n *RootNotifier) GetNotifierId() int64 {
return 0
}
func (n *RootNotifier) GetIsDefault() bool {
return false
}
func (n *RootNotifier) Notify(context *EvalContext) error {
notifiers, err := n.getNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
if err != nil {
return err
}
n.log.Info("Sending notifications for", "ruleId", context.Rule.Id, "Amount to send", len(notifiers))
if len(notifiers) == 0 {
return nil
}
@@ -57,11 +65,12 @@ func (n *RootNotifier) Notify(context *EvalContext) error {
}
func (n *RootNotifier) sendNotifications(context *EvalContext, notifiers []Notifier) error {
g, _ := errgroup.WithContext(context.Context)
g, _ := errgroup.WithContext(context.Ctx)
for _, notifier := range notifiers {
n.log.Info("Sending notification", "firing", context.Firing, "type", notifier.GetType())
g.Go(func() error { return notifier.Notify(context) })
not := notifier //avoid updating scope variable in go routine
n.log.Info("Sending notification", "type", not.GetType(), "id", not.GetNotifierId(), "isDefault", not.GetIsDefault())
g.Go(func() error { return not.Notify(context) })
}
return g.Wait()

View File

@@ -22,6 +22,14 @@ func (fn *FakeNotifier) NeedsImage() bool {
return true
}
func (n *FakeNotifier) GetNotifierId() int64 {
return 0
}
func (n *FakeNotifier) GetIsDefault() bool {
return false
}
func (fn *FakeNotifier) Notify(alertResult *EvalContext) error { return nil }
func (fn *FakeNotifier) PassesFilter(rule *Rule) bool {

View File

@@ -6,13 +6,19 @@ import (
)
type NotifierBase struct {
Name string
Type string
Name string
Type string
Id int64
IsDeault bool
}
func NewNotifierBase(name, notifierType string, model *simplejson.Json) NotifierBase {
base := NotifierBase{Name: name, Type: notifierType}
return base
func NewNotifierBase(id int64, isDefault bool, name, notifierType string, model *simplejson.Json) NotifierBase {
return NotifierBase{
Id: id,
Name: name,
IsDeault: isDefault,
Type: notifierType,
}
}
func (n *NotifierBase) PassesFilter(rule *alerting.Rule) bool {
@@ -26,3 +32,11 @@ func (n *NotifierBase) GetType() string {
func (n *NotifierBase) NeedsImage() bool {
return true
}
func (n *NotifierBase) GetNotifierId() int64 {
return n.Id
}
func (n *NotifierBase) GetIsDefault() bool {
return n.IsDeault
}

View File

@@ -29,7 +29,7 @@ func NewEmailNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
}
return &EmailNotifier{
NotifierBase: NewNotifierBase(model.Name, model.Type, model.Settings),
NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings),
Addresses: strings.Split(addressesString, "\n"),
log: log.New("alerting.notifier.email"),
}, nil
@@ -63,7 +63,7 @@ func (this *EmailNotifier) Notify(evalContext *alerting.EvalContext) error {
},
}
err = bus.DispatchCtx(evalContext, cmd)
err = bus.DispatchCtx(evalContext.Ctx, cmd)
if err != nil {
this.log.Error("Failed to send alert notification email", "error", err)

View File

@@ -23,7 +23,7 @@ func NewSlackNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
}
return &SlackNotifier{
NotifierBase: NewNotifierBase(model.Name, model.Type, model.Settings),
NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings),
Url: url,
log: log.New("alerting.notifier.slack"),
}, nil
@@ -90,7 +90,7 @@ func (this *SlackNotifier) Notify(evalContext *alerting.EvalContext) error {
data, _ := json.Marshal(&body)
cmd := &m.SendWebhookSync{Url: this.Url, Body: string(data)}
if err := bus.DispatchCtx(evalContext, cmd); err != nil {
if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil {
this.log.Error("Failed to send slack notification", "error", err, "webhook", this.Name)
}

View File

@@ -20,7 +20,7 @@ func NewWebHookNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
}
return &WebhookNotifier{
NotifierBase: NewNotifierBase(model.Name, model.Type, model.Settings),
NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings),
Url: url,
User: model.Settings.Get("user").MustString(),
Password: model.Settings.Get("password").MustString(),
@@ -65,7 +65,7 @@ func (this *WebhookNotifier) Notify(evalContext *alerting.EvalContext) error {
Body: string(body),
}
if err := bus.DispatchCtx(evalContext, cmd); err != nil {
if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil {
this.log.Error("Failed to send webhook", "error", err, "webhook", this.Name)
}

View File

@@ -43,17 +43,27 @@ var unitMultiplier = map[string]int{
"h": 3600,
}
func getTimeDurationStringToSeconds(str string) int64 {
func getTimeDurationStringToSeconds(str string) (int64, error) {
multiplier := 1
value, _ := strconv.Atoi(ValueFormatRegex.FindAllString(str, 1)[0])
matches := ValueFormatRegex.FindAllString(str, 1)
if len(matches) <= 0 {
return 0, fmt.Errorf("Frequency could not be parsed")
}
value, err := strconv.Atoi(matches[0])
if err != nil {
return 0, err
}
unit := UnitFormatRegex.FindAllString(str, 1)[0]
if val, ok := unitMultiplier[unit]; ok {
multiplier = val
}
return int64(value * multiplier)
return int64(value * multiplier), nil
}
func NewRuleFromDBAlert(ruleDef *m.Alert) (*Rule, error) {

View File

@@ -20,25 +20,30 @@ func TestAlertRuleModel(t *testing.T) {
})
Convey("Can parse seconds", func() {
seconds := getTimeDurationStringToSeconds("10s")
seconds, _ := getTimeDurationStringToSeconds("10s")
So(seconds, ShouldEqual, 10)
})
Convey("Can parse minutes", func() {
seconds := getTimeDurationStringToSeconds("10m")
seconds, _ := getTimeDurationStringToSeconds("10m")
So(seconds, ShouldEqual, 600)
})
Convey("Can parse hours", func() {
seconds := getTimeDurationStringToSeconds("1h")
seconds, _ := getTimeDurationStringToSeconds("1h")
So(seconds, ShouldEqual, 3600)
})
Convey("defaults to seconds", func() {
seconds := getTimeDurationStringToSeconds("1o")
seconds, _ := getTimeDurationStringToSeconds("1o")
So(seconds, ShouldEqual, 1)
})
Convey("should return err for empty string", func() {
_, err := getTimeDurationStringToSeconds("")
So(err, ShouldNotBeNil)
})
Convey("can construct alert rule model", func() {
json := `
{

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
)
type SchedulerImpl struct {
@@ -48,7 +49,7 @@ func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
now := tickTime.Unix()
for _, job := range s.jobs {
if job.Running {
if job.Running || job.Rule.State == models.AlertStatePaused {
continue
}

View File

@@ -18,6 +18,7 @@ func init() {
bus.AddHandler("sql", GetAllAlertQueryHandler)
bus.AddHandler("sql", SetAlertState)
bus.AddHandler("sql", GetAlertStatesForDashboard)
bus.AddHandler("sql", PauseAlertRule)
}
func GetAlertById(query *m.GetAlertByIdQuery) error {
@@ -243,6 +244,29 @@ func SetAlertState(cmd *m.SetAlertStateCommand) error {
})
}
func PauseAlertRule(cmd *m.PauseAlertCommand) error {
return inTransaction(func(sess *xorm.Session) error {
alert := m.Alert{}
if has, err := sess.Id(cmd.AlertId).Get(&alert); err != nil {
return err
} else if !has {
return fmt.Errorf("Could not find alert")
}
var newState m.AlertStateType
if cmd.Paused {
newState = m.AlertStatePaused
} else {
newState = m.AlertStateNoData
}
alert.State = newState
sess.Id(alert.Id).Update(&alert)
return nil
})
}
func GetAlertStatesForDashboard(query *m.GetAlertStatesForDashboardQuery) error {
var rawSql = `SELECT
id,

View File

@@ -134,8 +134,9 @@ var (
GoogleTagManagerId string
// LDAP
LdapEnabled bool
LdapConfigFile string
LdapEnabled bool
LdapConfigFile string
LdapAllowSignup bool = true
// SMTP email settings
Smtp SmtpSettings
@@ -144,7 +145,7 @@ var (
Quota QuotaSettings
// Alerting
AlertingEnabled bool
ExecuteAlerts bool
// logger
logger log.Logger
@@ -460,7 +461,7 @@ func NewConfigContext(args *CommandLineArgs) error {
Env = Cfg.Section("").Key("app_mode").MustString("development")
InstanceName = Cfg.Section("").Key("instance_name").MustString("unknown_instance_name")
PluginsPath = Cfg.Section("paths").Key("plugins").String()
PluginsPath = makeAbsolute(Cfg.Section("paths").Key("plugins").String(), HomePath)
server := Cfg.Section("server")
AppUrl, AppSubUrl = parseAppUrlAndSubUrl(server)
@@ -551,9 +552,10 @@ func NewConfigContext(args *CommandLineArgs) error {
ldapSec := Cfg.Section("auth.ldap")
LdapEnabled = ldapSec.Key("enabled").MustBool(false)
LdapConfigFile = ldapSec.Key("config_file").String()
LdapAllowSignup = ldapSec.Key("allow_sign_up").MustBool(true)
alerting := Cfg.Section("alerting")
AlertingEnabled = alerting.Key("enabled").MustBool(false)
ExecuteAlerts = alerting.Key("execute_alerts").MustBool(true)
readSessionConfig()
readSmtpSettings()

View File

@@ -9,6 +9,9 @@ type OAuthInfo struct {
ApiUrl string
AllowSignup bool
Name string
TlsClientCert string
TlsClientKey string
TlsClientCa string
}
type OAuther struct {

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"github.com/grafana/grafana/pkg/models"
@@ -160,15 +159,16 @@ func (s *GenericOAuth) FetchOrganizations(client *http.Client) ([]string, error)
return logins, nil
}
func (s *GenericOAuth) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
func (s *GenericOAuth) UserInfo(client *http.Client) (*BasicUserInfo, error) {
var data struct {
Id int `json:"id"`
Name string `json:"login"`
Email string `json:"email"`
Name string `json:"name"`
Login string `json:"login"`
Username string `json:"username"`
Email string `json:"email"`
Attributes map[string][]string `json:"attributes"`
}
var err error
client := s.Client(oauth2.NoContext, token)
r, err := client.Get(s.apiUrl)
if err != nil {
return nil, err
@@ -181,11 +181,30 @@ func (s *GenericOAuth) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
}
userInfo := &BasicUserInfo{
Identity: strconv.Itoa(data.Id),
Name: data.Name,
Login: data.Login,
Email: data.Email,
}
if (userInfo.Email == "" && data.Attributes["email:primary"] != nil) {
userInfo.Email = data.Attributes["email:primary"][0]
}
if userInfo.Email == "" {
userInfo.Email, err = s.FetchPrivateEmail(client)
if err != nil {
return nil, err
}
}
if (userInfo.Login == "" && data.Username != "") {
userInfo.Login = data.Username
}
if (userInfo.Login == "") {
userInfo.Login = data.Email
}
if !s.IsTeamMember(client) {
return nil, errors.New("User not a member of one of the required teams")
}
@@ -194,12 +213,5 @@ func (s *GenericOAuth) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
return nil, errors.New("User not a member of one of the required organizations")
}
if userInfo.Email == "" {
userInfo.Email, err = s.FetchPrivateEmail(client)
if err != nil {
return nil, err
}
}
return userInfo, nil
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"github.com/grafana/grafana/pkg/models"
@@ -168,15 +167,14 @@ func (s *SocialGithub) FetchOrganizations(client *http.Client) ([]string, error)
return logins, nil
}
func (s *SocialGithub) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
func (s *SocialGithub) UserInfo(client *http.Client) (*BasicUserInfo, error) {
var data struct {
Id int `json:"id"`
Name string `json:"login"`
Login string `json:"login"`
Email string `json:"email"`
}
var err error
client := s.Client(oauth2.NoContext, token)
r, err := client.Get(s.apiUrl)
if err != nil {
return nil, err
@@ -189,8 +187,8 @@ func (s *SocialGithub) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
}
userInfo := &BasicUserInfo{
Identity: strconv.Itoa(data.Id),
Name: data.Name,
Name: data.Login,
Login: data.Login,
Email: data.Email,
}

View File

@@ -2,6 +2,7 @@ package social
import (
"encoding/json"
"net/http"
"github.com/grafana/grafana/pkg/models"
@@ -27,15 +28,13 @@ func (s *SocialGoogle) IsSignupAllowed() bool {
return s.allowSignup
}
func (s *SocialGoogle) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
func (s *SocialGoogle) UserInfo(client *http.Client) (*BasicUserInfo, error) {
var data struct {
Id string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
var err error
client := s.Client(oauth2.NoContext, token)
r, err := client.Get(s.apiUrl)
if err != nil {
return nil, err
@@ -45,7 +44,6 @@ func (s *SocialGoogle) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
return nil, err
}
return &BasicUserInfo{
Identity: data.Id,
Name: data.Name,
Email: data.Email,
}, nil

View File

@@ -2,9 +2,7 @@ package social
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/grafana/grafana/pkg/models"
@@ -18,6 +16,10 @@ type SocialGrafanaNet struct {
allowSignup bool
}
type OrgRecord struct {
Login string `json:"login"`
}
func (s *SocialGrafanaNet) Type() int {
return int(models.GRAFANANET)
}
@@ -30,19 +32,14 @@ func (s *SocialGrafanaNet) IsSignupAllowed() bool {
return s.allowSignup
}
func (s *SocialGrafanaNet) IsOrganizationMember(client *http.Client) bool {
func (s *SocialGrafanaNet) IsOrganizationMember(organizations []OrgRecord) bool {
if len(s.allowedOrganizations) == 0 {
return true
}
organizations, err := s.FetchOrganizations(client)
if err != nil {
return false
}
for _, allowedOrganization := range s.allowedOrganizations {
for _, organization := range organizations {
if organization == allowedOrganization {
if organization.Login == allowedOrganization {
return true
}
}
@@ -51,43 +48,16 @@ func (s *SocialGrafanaNet) IsOrganizationMember(client *http.Client) bool {
return false
}
func (s *SocialGrafanaNet) FetchOrganizations(client *http.Client) ([]string, error) {
type Record struct {
Login string `json:"login"`
}
url := fmt.Sprintf(s.url + "/api/oauth2/user/orgs")
r, err := client.Get(url)
if err != nil {
return nil, err
}
defer r.Body.Close()
var records []Record
if err = json.NewDecoder(r.Body).Decode(&records); err != nil {
return nil, err
}
var logins = make([]string, len(records))
for i, record := range records {
logins[i] = record.Login
}
return logins, nil
}
func (s *SocialGrafanaNet) UserInfo(token *oauth2.Token) (*BasicUserInfo, error) {
func (s *SocialGrafanaNet) UserInfo(client *http.Client) (*BasicUserInfo, error) {
var data struct {
Id int `json:"id"`
Name string `json:"login"`
Name string `json:"name"`
Login string `json:"username"`
Email string `json:"email"`
Role string `json:"role"`
Orgs []OrgRecord `json:"orgs"`
}
var err error
client := s.Client(oauth2.NoContext, token)
r, err := client.Get(s.url + "/api/oauth2/user")
if err != nil {
return nil, err
@@ -100,13 +70,13 @@ func (s *SocialGrafanaNet) UserInfo(token *oauth2.Token) (*BasicUserInfo, error)
}
userInfo := &BasicUserInfo{
Identity: strconv.Itoa(data.Id),
Name: data.Name,
Login: data.Login,
Email: data.Email,
Role: data.Role,
}
if !s.IsOrganizationMember(client) {
if !s.IsOrganizationMember(data.Orgs) {
return nil, ErrMissingOrganizationMembership
}

View File

@@ -1,16 +1,16 @@
package social
import (
"net/http"
"strings"
"github.com/grafana/grafana/pkg/setting"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"github.com/grafana/grafana/pkg/setting"
)
type BasicUserInfo struct {
Identity string
Name string
Email string
Login string
@@ -20,12 +20,13 @@ type BasicUserInfo struct {
type SocialConnector interface {
Type() int
UserInfo(token *oauth2.Token) (*BasicUserInfo, error)
UserInfo(client *http.Client) (*BasicUserInfo, error)
IsEmailAllowed(email string) bool
IsSignupAllowed() bool
AuthCodeURL(state string, opts ...oauth2.AuthCodeOption) string
Exchange(ctx context.Context, code string) (*oauth2.Token, error)
Client(ctx context.Context, t *oauth2.Token) *http.Client
}
var (
@@ -52,6 +53,9 @@ func NewOAuthService() {
AllowedDomains: sec.Key("allowed_domains").Strings(" "),
AllowSignup: sec.Key("allow_sign_up").MustBool(),
Name: sec.Key("name").MustString(name),
TlsClientCert: sec.Key("tls_client_cert").String(),
TlsClientKey: sec.Key("tls_client_key").String(),
TlsClientCa: sec.Key("tls_client_ca").String(),
}
if !info.Enabled {
@@ -59,6 +63,7 @@ func NewOAuthService() {
}
setting.OAuthService.OAuthInfos[name] = info
config := oauth2.Config{
ClientID: info.ClientId,
ClientSecret: info.ClientSecret,
@@ -85,9 +90,10 @@ func NewOAuthService() {
// Google.
if name == "google" {
SocialMap["google"] = &SocialGoogle{
Config: &config, allowedDomains: info.AllowedDomains,
apiUrl: info.ApiUrl,
allowSignup: info.AllowSignup,
Config: &config,
allowedDomains: info.AllowedDomains,
apiUrl: info.ApiUrl,
allowSignup: info.AllowSignup,
}
}
@@ -104,15 +110,15 @@ func NewOAuthService() {
}
if name == "grafananet" {
config := oauth2.Config{
config = oauth2.Config{
ClientID: info.ClientId,
ClientSecret: info.ClientSecret,
Endpoint: oauth2.Endpoint{
AuthURL: setting.GrafanaNetUrl + "/oauth2/authorize",
TokenURL: setting.GrafanaNetUrl + "/api/oauth2/token",
Endpoint: oauth2.Endpoint{
AuthURL: setting.GrafanaNetUrl + "/oauth2/authorize",
TokenURL: setting.GrafanaNetUrl + "/api/oauth2/token",
},
RedirectURL: strings.TrimSuffix(setting.AppUrl, "/") + SocialBaseUrl + name,
Scopes: info.Scopes,
RedirectURL: strings.TrimSuffix(setting.AppUrl, "/") + SocialBaseUrl + name,
Scopes: info.Scopes,
}
SocialMap["grafananet"] = &SocialGrafanaNet{

View File

@@ -3,7 +3,7 @@ package tsdb
import "context"
type Executor interface {
Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult
Execute(ctx context.Context, queries QuerySlice, query *QueryContext) *BatchResult
}
var registry map[string]GetExecutorFn

View File

@@ -57,7 +57,11 @@ func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
}
for _, query := range queries {
formData["target"] = []string{query.Model.Get("target").MustString()}
if fullTarget, err := query.Model.Get("targetFull").String(); err == nil {
formData["target"] = []string{fullTarget}
} else {
formData["target"] = []string{query.Model.Get("target").MustString()}
}
}
if setting.Env == setting.DEV {

View File

@@ -0,0 +1,133 @@
package influxdb
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"time"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
)
type InfluxDBExecutor struct {
*tsdb.DataSourceInfo
QueryParser *InfluxdbQueryParser
QueryBuilder *QueryBuilder
ResponseParser *ResponseParser
}
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
return &InfluxDBExecutor{
DataSourceInfo: dsInfo,
QueryParser: &InfluxdbQueryParser{},
QueryBuilder: &QueryBuilder{},
ResponseParser: &ResponseParser{},
}
}
var (
glog log.Logger
HttpClient *http.Client
)
func init() {
glog = log.New("tsdb.influxdb")
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
HttpClient = &http.Client{
Timeout: time.Duration(15 * time.Second),
Transport: tr,
}
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
query, err := e.getQuery(queries, context)
if err != nil {
return result.WithError(err)
}
glog.Debug("Influxdb query", "raw query", query)
req, err := e.createRequest(query)
if err != nil {
return result.WithError(err)
}
resp, err := ctxhttp.Do(ctx, HttpClient, req)
if err != nil {
return result.WithError(err)
}
if resp.StatusCode/100 != 2 {
return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
}
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&response)
if err != nil {
return result.WithError(err)
}
result.QueryResults = make(map[string]*tsdb.QueryResult)
result.QueryResults["A"] = e.ResponseParser.Parse(&response)
return result
}
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
for _, v := range queries {
query, err := e.QueryParser.Parse(v.Model, e.DataSourceInfo)
if err != nil {
return "", err
}
rawQuery, err := e.QueryBuilder.Build(query, context)
if err != nil {
return "", err
}
return rawQuery, nil
}
return "", fmt.Errorf("query request contains no queries")
}
func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
u, _ := url.Parse(e.Url)
u.Path = path.Join(u.Path, "query")
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
params := req.URL.Query()
params.Set("q", query)
params.Set("db", e.Database)
params.Set("epoch", "s")
req.URL.RawQuery = params.Encode()
req.Header.Set("User-Agent", "Grafana")
if e.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
glog.Debug("Influxdb request", "url", req.URL.String())
return req, nil
}

View File

@@ -0,0 +1,162 @@
package influxdb
import (
"strconv"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
)
type InfluxdbQueryParser struct{}
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *tsdb.DataSourceInfo) (*Query, error) {
policy := model.Get("policy").MustString("default")
rawQuery := model.Get("query").MustString("")
interval := model.Get("interval").MustString("")
measurement := model.Get("measurement").MustString("")
resultFormat, err := model.Get("resultFormat").String()
if err != nil {
return nil, err
}
tags, err := qp.parseTags(model)
if err != nil {
return nil, err
}
groupBys, err := qp.parseGroupBy(model)
if err != nil {
return nil, err
}
selects, err := qp.parseSelects(model)
if err != nil {
return nil, err
}
if interval == "" {
dsInterval := dsInfo.JsonData.Get("timeInterval").MustString("")
if dsInterval != "" {
interval = dsInterval
}
}
return &Query{
Measurement: measurement,
Policy: policy,
ResultFormat: resultFormat,
GroupBy: groupBys,
Tags: tags,
Selects: selects,
RawQuery: rawQuery,
Interval: interval,
}, nil
}
func (qp *InfluxdbQueryParser) parseSelects(model *simplejson.Json) ([]*Select, error) {
var result []*Select
for _, selectObj := range model.Get("select").MustArray() {
selectJson := simplejson.NewFromAny(selectObj)
var parts Select
for _, partObj := range selectJson.MustArray() {
part := simplejson.NewFromAny(partObj)
queryPart, err := qp.parseQueryPart(part)
if err != nil {
return nil, err
}
parts = append(parts, *queryPart)
}
result = append(result, &parts)
}
return result, nil
}
func (*InfluxdbQueryParser) parseTags(model *simplejson.Json) ([]*Tag, error) {
var result []*Tag
for _, t := range model.Get("tags").MustArray() {
tagJson := simplejson.NewFromAny(t)
tag := &Tag{}
var err error
tag.Key, err = tagJson.Get("key").String()
if err != nil {
return nil, err
}
tag.Value, err = tagJson.Get("value").String()
if err != nil {
return nil, err
}
operator, err := tagJson.Get("operator").String()
if err == nil {
tag.Operator = operator
}
condition, err := tagJson.Get("condition").String()
if err == nil {
tag.Condition = condition
}
result = append(result, tag)
}
return result, nil
}
func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart, error) {
typ, err := model.Get("type").String()
if err != nil {
return nil, err
}
var params []string
for _, paramObj := range model.Get("params").MustArray() {
param := simplejson.NewFromAny(paramObj)
stringParam, err := param.String()
if err == nil {
params = append(params, stringParam)
continue
}
intParam, err := param.Int()
if err == nil {
params = append(params, strconv.Itoa(intParam))
continue
}
return nil, err
}
qp, err := NewQueryPart(typ, params)
if err != nil {
return nil, err
}
return qp, nil
}
func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) {
var result []*QueryPart
for _, groupObj := range model.Get("groupBy").MustArray() {
groupJson := simplejson.NewFromAny(groupObj)
queryPart, err := qp.parseQueryPart(groupJson)
if err != nil {
return nil, err
}
result = append(result, queryPart)
}
return result, nil
}

View File

@@ -0,0 +1,178 @@
package influxdb
import (
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestInfluxdbQueryParser(t *testing.T) {
Convey("Influxdb query parser", t, func() {
parser := &InfluxdbQueryParser{}
dsInfo := &tsdb.DataSourceInfo{
JsonData: simplejson.New(),
}
Convey("can parse influxdb json model", func() {
json := `
{
"dsType": "influxdb",
"groupBy": [
{
"params": [
"$interval"
],
"type": "time"
},
{
"params": [
"datacenter"
],
"type": "tag"
},
{
"params": [
"none"
],
"type": "fill"
}
],
"measurement": "logins.count",
"policy": "default",
"refId": "B",
"resultFormat": "time_series",
"select": [
[
{
"type": "field",
"params": [
"value"
]
},
{
"type": "count",
"params": []
}
],
[
{
"type": "field",
"params": [
"value"
]
},
{
"type": "bottom",
"params": [
3
]
}
],
[
{
"type": "field",
"params": [
"value"
]
},
{
"type": "mean",
"params": []
},
{
"type": "math",
"params": [
" / 100"
]
}
]
],
"tags": [
{
"key": "datacenter",
"operator": "=",
"value": "America"
},
{
"condition": "OR",
"key": "hostname",
"operator": "=",
"value": "server1"
}
]
}
`
dsInfo.JsonData.Set("timeInterval", ">20s")
modelJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
res, err := parser.Parse(modelJson, dsInfo)
So(err, ShouldBeNil)
So(len(res.GroupBy), ShouldEqual, 3)
So(len(res.Selects), ShouldEqual, 3)
So(len(res.Tags), ShouldEqual, 2)
So(res.Interval, ShouldEqual, ">20s")
})
Convey("can part raw query json model", func() {
json := `
{
"dsType": "influxdb",
"groupBy": [
{
"params": [
"$interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"interval": ">10s",
"policy": "default",
"query": "RawDummieQuery",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"value"
],
"type": "field"
},
{
"params": [
],
"type": "mean"
}
]
],
"tags": [
]
}
`
modelJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
res, err := parser.Parse(modelJson, dsInfo)
So(err, ShouldBeNil)
So(res.RawQuery, ShouldEqual, "RawDummieQuery")
So(len(res.GroupBy), ShouldEqual, 2)
So(len(res.Selects), ShouldEqual, 1)
So(len(res.Tags), ShouldEqual, 0)
So(res.Interval, ShouldEqual, ">10s")
})
})
}

View File

@@ -0,0 +1,49 @@
package influxdb
type Query struct {
Measurement string
Policy string
ResultFormat string
Tags []*Tag
GroupBy []*QueryPart
Selects []*Select
RawQuery string
Interval string
}
type Tag struct {
Key string
Operator string
Value string
Condition string
}
type Select []QueryPart
type InfluxDbSelect struct {
Type string
}
type Response struct {
Results []Result
Err error
}
type Result struct {
Series []Row
Messages []*Message
Err error
}
type Message struct {
Level string `json:"level,omitempty"`
Text string `json:"text,omitempty"`
}
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
}

View File

@@ -0,0 +1,116 @@
package influxdb
import (
"fmt"
"strings"
"github.com/grafana/grafana/pkg/tsdb"
)
type QueryBuilder struct{}
func (qb *QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
if query.RawQuery != "" {
q := query.RawQuery
q = strings.Replace(q, "$timeFilter", qb.renderTimeFilter(query, queryContext), 1)
q = strings.Replace(q, "$interval", tsdb.CalculateInterval(queryContext.TimeRange), 1)
return q, nil
}
res := qb.renderSelectors(query, queryContext)
res += qb.renderMeasurement(query)
res += qb.renderWhereClause(query)
res += qb.renderTimeFilter(query, queryContext)
res += qb.renderGroupBy(query, queryContext)
return res, nil
}
func (qb *QueryBuilder) renderTags(query *Query) []string {
var res []string
for i, tag := range query.Tags {
str := ""
if i > 0 {
if tag.Condition == "" {
str += "AND"
} else {
str += tag.Condition
}
str += " "
}
res = append(res, fmt.Sprintf(`%s"%s" %s '%s'`, str, tag.Key, tag.Operator, tag.Value))
}
return res
}
func (qb *QueryBuilder) renderTimeFilter(query *Query, queryContext *tsdb.QueryContext) string {
from := "now() - " + queryContext.TimeRange.From
to := ""
if queryContext.TimeRange.To != "now" && queryContext.TimeRange.To != "" {
to = " and time < now() - " + strings.Replace(queryContext.TimeRange.To, "now-", "", 1)
}
return fmt.Sprintf("time > %s%s", from, to)
}
func (qb *QueryBuilder) renderSelectors(query *Query, queryContext *tsdb.QueryContext) string {
res := "SELECT "
var selectors []string
for _, sel := range query.Selects {
stk := ""
for _, s := range *sel {
stk = s.Render(query, queryContext, stk)
}
selectors = append(selectors, stk)
}
return res + strings.Join(selectors, ", ")
}
func (qb *QueryBuilder) renderMeasurement(query *Query) string {
policy := ""
if query.Policy == "" || query.Policy == "default" {
policy = ""
} else {
policy = `"` + query.Policy + `".`
}
return fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
}
func (qb *QueryBuilder) renderWhereClause(query *Query) string {
res := " WHERE "
conditions := qb.renderTags(query)
res += strings.Join(conditions, " ")
if len(conditions) > 0 {
res += " AND "
}
return res
}
func (qb *QueryBuilder) renderGroupBy(query *Query, queryContext *tsdb.QueryContext) string {
groupBy := ""
for i, group := range query.GroupBy {
if i == 0 {
groupBy += " GROUP BY"
}
if i > 0 && group.Type != "fill" {
groupBy += ", " //fill is so very special. fill is a creep, fill is a weirdo
} else {
groupBy += " "
}
groupBy += group.Render(query, queryContext, "")
}
return groupBy
}

View File

@@ -0,0 +1,87 @@
package influxdb
import (
"testing"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestInfluxdbQueryBuilder(t *testing.T) {
Convey("Influxdb query builder", t, func() {
builder := QueryBuilder{}
qp1, _ := NewQueryPart("field", []string{"value"})
qp2, _ := NewQueryPart("mean", []string{})
groupBy1, _ := NewQueryPart("time", []string{"$interval"})
groupBy2, _ := NewQueryPart("tag", []string{"datacenter"})
groupBy3, _ := NewQueryPart("fill", []string{"null"})
tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
queryContext := &tsdb.QueryContext{
TimeRange: tsdb.NewTimeRange("5m", "now"),
}
Convey("can build simple query", func() {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
Policy: "policy",
GroupBy: []*QueryPart{groupBy1, groupBy3},
Interval: "10s",
}
rawQuery, err := builder.Build(query, queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(10s) fill(null)`)
})
Convey("can build query with group bys", func() {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
GroupBy: []*QueryPart{groupBy1, groupBy2, groupBy3},
Tags: []*Tag{tag1, tag2},
Interval: "5s",
}
rawQuery, err := builder.Build(query, queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(5s), "datacenter" fill(null)`)
})
Convey("can render time range", func() {
query := Query{}
builder := &QueryBuilder{}
Convey("render from: 2h to now-1h", func() {
query := Query{}
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("2h", "now-1h")}
So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h")
})
Convey("render from: 10m", func() {
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("10m", "now")}
So(builder.renderTimeFilter(&query, queryContext), ShouldEqual, "time > now() - 10m")
})
})
Convey("can build query from raw query", func() {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
Policy: "policy",
GroupBy: []*QueryPart{groupBy1, groupBy3},
Interval: "10s",
RawQuery: "Raw query",
}
rawQuery, err := builder.Build(query, queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `Raw query`)
})
})
}

View File

@@ -0,0 +1,166 @@
package influxdb
import (
"fmt"
"strings"
"time"
"github.com/grafana/grafana/pkg/tsdb"
)
var renders map[string]QueryDefinition
type DefinitionParameters struct {
Name string
Type string
}
type QueryDefinition struct {
Renderer func(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string
Params []DefinitionParameters
}
func init() {
renders = make(map[string]QueryDefinition)
renders["field"] = QueryDefinition{Renderer: fieldRenderer}
renders["spread"] = QueryDefinition{Renderer: functionRenderer}
renders["count"] = QueryDefinition{Renderer: functionRenderer}
renders["distinct"] = QueryDefinition{Renderer: functionRenderer}
renders["integral"] = QueryDefinition{Renderer: functionRenderer}
renders["mean"] = QueryDefinition{Renderer: functionRenderer}
renders["median"] = QueryDefinition{Renderer: functionRenderer}
renders["sum"] = QueryDefinition{Renderer: functionRenderer}
renders["derivative"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
}
renders["non_negative_derivative"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
}
renders["difference"] = QueryDefinition{Renderer: functionRenderer}
renders["moving_average"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "window", Type: "number"}},
}
renders["stddev"] = QueryDefinition{Renderer: functionRenderer}
renders["time"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "interval", Type: "time"}},
}
renders["fill"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "fill", Type: "string"}},
}
renders["elapsed"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "duration", Type: "interval"}},
}
renders["bottom"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "count", Type: "int"}},
}
renders["first"] = QueryDefinition{Renderer: functionRenderer}
renders["last"] = QueryDefinition{Renderer: functionRenderer}
renders["max"] = QueryDefinition{Renderer: functionRenderer}
renders["min"] = QueryDefinition{Renderer: functionRenderer}
renders["percentile"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "nth", Type: "int"}},
}
renders["top"] = QueryDefinition{
Renderer: functionRenderer,
Params: []DefinitionParameters{{Name: "count", Type: "int"}},
}
renders["tag"] = QueryDefinition{
Renderer: fieldRenderer,
Params: []DefinitionParameters{{Name: "tag", Type: "string"}},
}
renders["math"] = QueryDefinition{Renderer: suffixRenderer}
renders["alias"] = QueryDefinition{Renderer: aliasRenderer}
}
func fieldRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
if part.Params[0] == "*" {
return "*"
}
return fmt.Sprintf(`"%s"`, part.Params[0])
}
func getDefinedInterval(query *Query, queryContext *tsdb.QueryContext) string {
setInterval := strings.Replace(strings.Replace(query.Interval, "<", "", 1), ">", "", 1)
defaultInterval := tsdb.CalculateInterval(queryContext.TimeRange)
if strings.Contains(query.Interval, ">") {
parsedDefaultInterval, err := time.ParseDuration(defaultInterval)
parsedSetInterval, err2 := time.ParseDuration(setInterval)
if err == nil && err2 == nil && parsedDefaultInterval > parsedSetInterval {
return defaultInterval
}
}
return setInterval
}
func functionRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
for i, param := range part.Params {
if param == "$interval" {
if query.Interval != "" {
part.Params[i] = getDefinedInterval(query, queryContext)
} else {
part.Params[i] = tsdb.CalculateInterval(queryContext.TimeRange)
}
}
}
if innerExpr != "" {
part.Params = append([]string{innerExpr}, part.Params...)
}
params := strings.Join(part.Params, ", ")
return fmt.Sprintf("%s(%s)", part.Type, params)
}
func suffixRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
return fmt.Sprintf("%s %s", innerExpr, part.Params[0])
}
func aliasRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0])
}
func (r QueryDefinition) Render(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
return r.Renderer(query, queryContext, part, innerExpr)
}
func NewQueryPart(typ string, params []string) (*QueryPart, error) {
def, exist := renders[typ]
if !exist {
return nil, fmt.Errorf("Missing query definition for %s", typ)
}
return &QueryPart{
Type: typ,
Params: params,
Def: def,
}, nil
}
type QueryPart struct {
Def QueryDefinition
Type string
Params []string
}
func (qp *QueryPart) Render(query *Query, queryContext *tsdb.QueryContext, expr string) string {
return qp.Def.Renderer(query, queryContext, qp, expr)
}

View File

@@ -0,0 +1,93 @@
package influxdb
import (
"testing"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestInfluxdbQueryPart(t *testing.T) {
Convey("Influxdb query parts", t, func() {
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("5m", "now")}
query := &Query{}
Convey("render field ", func() {
part, err := NewQueryPart("field", []string{"value"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "value")
So(res, ShouldEqual, `"value"`)
})
Convey("render nested part", func() {
part, err := NewQueryPart("derivative", []string{"10s"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "mean(value)")
So(res, ShouldEqual, "derivative(mean(value), 10s)")
})
Convey("render bottom", func() {
part, err := NewQueryPart("bottom", []string{"3"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "value")
So(res, ShouldEqual, "bottom(value, 3)")
})
Convey("render time", func() {
part, err := NewQueryPart("time", []string{"$interval"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "")
So(res, ShouldEqual, "time(200ms)")
})
Convey("render time interval >10s", func() {
part, err := NewQueryPart("time", []string{"$interval"})
So(err, ShouldBeNil)
query.Interval = ">10s"
res := part.Render(query, queryContext, "")
So(res, ShouldEqual, "time(10s)")
})
Convey("render time interval >1s and higher interval calculation", func() {
part, err := NewQueryPart("time", []string{"$interval"})
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("1y", "now")}
So(err, ShouldBeNil)
query.Interval = ">1s"
res := part.Render(query, queryContext, "")
So(res, ShouldEqual, "time(168h)")
})
Convey("render spread", func() {
part, err := NewQueryPart("spread", []string{})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "value")
So(res, ShouldEqual, `spread(value)`)
})
Convey("render suffix", func() {
part, err := NewQueryPart("math", []string{"/ 100"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "mean(value)")
So(res, ShouldEqual, "mean(value) / 100")
})
Convey("render alias", func() {
part, err := NewQueryPart("alias", []string{"test"})
So(err, ShouldBeNil)
res := part.Render(query, queryContext, "mean(value)")
So(res, ShouldEqual, `mean(value) AS "test"`)
})
})
}

View File

@@ -0,0 +1,94 @@
package influxdb
import (
"encoding/json"
"fmt"
"strings"
"github.com/grafana/grafana/pkg/tsdb"
"gopkg.in/guregu/null.v3"
)
type ResponseParser struct{}
func (rp *ResponseParser) Parse(response *Response) *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
for _, result := range response.Results {
queryRes.Series = append(queryRes.Series, rp.transformRows(result.Series, queryRes)...)
}
return queryRes
}
func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult) tsdb.TimeSeriesSlice {
var result tsdb.TimeSeriesSlice
for _, row := range rows {
for columnIndex, column := range row.Columns {
if column == "time" {
continue
}
var points tsdb.TimeSeriesPoints
for _, valuePair := range row.Values {
point, err := rp.parseTimepoint(valuePair, columnIndex)
if err == nil {
points = append(points, point)
}
}
result = append(result, &tsdb.TimeSeries{
Name: rp.formatSerieName(row, column),
Points: points,
})
}
}
return result
}
func (rp *ResponseParser) formatSerieName(row Row, column string) string {
var tags []string
for k, v := range row.Tags {
tags = append(tags, fmt.Sprintf("%s: %s", k, v))
}
tagText := ""
if len(tags) > 0 {
tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " "))
}
return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)
}
func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) {
var value null.Float = rp.parseValue(valuePair[valuePosition])
timestampNumber, _ := valuePair[0].(json.Number)
timestamp, err := timestampNumber.Float64()
if err != nil {
return tsdb.TimePoint{}, err
}
return tsdb.NewTimePoint(value, timestamp), nil
}
func (rp *ResponseParser) parseValue(value interface{}) null.Float {
number, ok := value.(json.Number)
if !ok {
return null.FloatFromPtr(nil)
}
fvalue, err := number.Float64()
if err == nil {
return null.FloatFrom(fvalue)
}
ivalue, err := number.Int64()
if err == nil {
return null.FloatFrom(float64(ivalue))
}
return null.FloatFromPtr(nil)
}

View File

@@ -0,0 +1,59 @@
package influxdb
import (
"encoding/json"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestInfluxdbResponseParser(t *testing.T) {
Convey("Influxdb response parser", t, func() {
parser := &ResponseParser{}
response := &Response{
Results: []Result{
Result{
Series: []Row{
{
Name: "cpu",
Columns: []string{"time", "mean", "sum"},
Tags: map[string]string{"datacenter": "America"},
Values: [][]interface{}{
{json.Number("111"), json.Number("222"), json.Number("333")},
{json.Number("111"), json.Number("222"), json.Number("333")},
{json.Number("111"), json.Number("null"), json.Number("333")},
},
},
},
},
},
}
result := parser.Parse(response)
Convey("can parse all series", func() {
So(len(result.Series), ShouldEqual, 2)
})
Convey("can parse all points", func() {
So(len(result.Series[0].Points), ShouldEqual, 3)
So(len(result.Series[1].Points), ShouldEqual, 3)
})
Convey("can parse multi row result", func() {
So(result.Series[0].Points[1][0].Float64, ShouldEqual, float64(222))
So(result.Series[1].Points[1][0].Float64, ShouldEqual, float64(333))
})
Convey("can parse null points", func() {
So(result.Series[0].Points[2][0].Valid, ShouldBeFalse)
})
Convey("can format serie names", func() {
So(result.Series[0].Name, ShouldEqual, "cpu.mean { datacenter: America }")
So(result.Series[1].Name, ShouldEqual, "cpu.sum { datacenter: America }")
})
})
}

145
pkg/tsdb/interval.go Normal file
View File

@@ -0,0 +1,145 @@
package tsdb
import (
"fmt"
"time"
)
var (
defaultRes int64 = 1500
minInterval time.Duration = 1 * time.Millisecond
year time.Duration = time.Hour * 24 * 365
day time.Duration = time.Hour * 24 * 365
)
func CalculateInterval(timerange *TimeRange) string {
interval := time.Duration((timerange.MustGetTo().UnixNano() - timerange.MustGetFrom().UnixNano()) / defaultRes)
if interval < minInterval {
return formatDuration(minInterval)
}
return formatDuration(roundInterval(interval))
}
func formatDuration(inter time.Duration) string {
if inter >= year {
return fmt.Sprintf("%dy", inter/year)
}
if inter >= day {
return fmt.Sprintf("%dd", inter/day)
}
if inter >= time.Hour {
return fmt.Sprintf("%dh", inter/time.Hour)
}
if inter >= time.Minute {
return fmt.Sprintf("%dm", inter/time.Minute)
}
if inter >= time.Second {
return fmt.Sprintf("%ds", inter/time.Second)
}
if inter >= time.Millisecond {
return fmt.Sprintf("%dms", inter/time.Millisecond)
}
return "1ms"
}
func roundInterval(interval time.Duration) time.Duration {
switch true {
// 0.015s
case interval <= 15*time.Millisecond:
return time.Millisecond * 10 // 0.01s
// 0.035s
case interval <= 35*time.Millisecond:
return time.Millisecond * 20 // 0.02s
// 0.075s
case interval <= 75*time.Millisecond:
return time.Millisecond * 50 // 0.05s
// 0.15s
case interval <= 150*time.Millisecond:
return time.Millisecond * 100 // 0.1s
// 0.35s
case interval <= 350*time.Millisecond:
return time.Millisecond * 200 // 0.2s
// 0.75s
case interval <= 750*time.Millisecond:
return time.Millisecond * 500 // 0.5s
// 1.5s
case interval <= 1500*time.Millisecond:
return time.Millisecond * 1000 // 1s
// 3.5s
case interval <= 3500*time.Millisecond:
return time.Millisecond * 2000 // 2s
// 7.5s
case interval <= 7500*time.Millisecond:
return time.Millisecond * 5000 // 5s
// 12.5s
case interval <= 12500*time.Millisecond:
return time.Millisecond * 10000 // 10s
// 17.5s
case interval <= 17500*time.Millisecond:
return time.Millisecond * 15000 // 15s
// 25s
case interval <= 25000*time.Millisecond:
return time.Millisecond * 20000 // 20s
// 45s
case interval <= 45000*time.Millisecond:
return time.Millisecond * 30000 // 30s
// 1.5m
case interval <= 90000*time.Millisecond:
return time.Millisecond * 60000 // 1m
// 3.5m
case interval <= 210000*time.Millisecond:
return time.Millisecond * 120000 // 2m
// 7.5m
case interval <= 450000*time.Millisecond:
return time.Millisecond * 300000 // 5m
// 12.5m
case interval <= 750000*time.Millisecond:
return time.Millisecond * 600000 // 10m
// 12.5m
case interval <= 1050000*time.Millisecond:
return time.Millisecond * 900000 // 15m
// 25m
case interval <= 1500000*time.Millisecond:
return time.Millisecond * 1200000 // 20m
// 45m
case interval <= 2700000*time.Millisecond:
return time.Millisecond * 1800000 // 30m
// 1.5h
case interval <= 5400000*time.Millisecond:
return time.Millisecond * 3600000 // 1h
// 2.5h
case interval <= 9000000*time.Millisecond:
return time.Millisecond * 7200000 // 2h
// 4.5h
case interval <= 16200000*time.Millisecond:
return time.Millisecond * 10800000 // 3h
// 9h
case interval <= 32400000*time.Millisecond:
return time.Millisecond * 21600000 // 6h
// 24h
case interval <= 86400000*time.Millisecond:
return time.Millisecond * 43200000 // 12h
// 48h
case interval <= 172800000*time.Millisecond:
return time.Millisecond * 86400000 // 24h
// 1w
case interval <= 604800000*time.Millisecond:
return time.Millisecond * 86400000 // 24h
// 3w
case interval <= 1814400000*time.Millisecond:
return time.Millisecond * 604800000 // 1w
// 2y
case interval < 3628800000*time.Millisecond:
return time.Millisecond * 2592000000 // 30d
default:
return time.Millisecond * 31536000000 // 1y
}
}

57
pkg/tsdb/interval_test.go Normal file
View File

@@ -0,0 +1,57 @@
package tsdb
import (
"testing"
"time"
"github.com/grafana/grafana/pkg/setting"
. "github.com/smartystreets/goconvey/convey"
)
func TestInterval(t *testing.T) {
Convey("Default interval ", t, func() {
setting.NewConfigContext(&setting.CommandLineArgs{
HomePath: "../../",
})
Convey("for 5min", func() {
tr := NewTimeRange("5m", "now")
interval := CalculateInterval(tr)
So(interval, ShouldEqual, "200ms")
})
Convey("for 15min", func() {
tr := NewTimeRange("15m", "now")
interval := CalculateInterval(tr)
So(interval, ShouldEqual, "500ms")
})
Convey("for 30min", func() {
tr := NewTimeRange("30m", "now")
interval := CalculateInterval(tr)
So(interval, ShouldEqual, "1s")
})
Convey("for 1h", func() {
tr := NewTimeRange("1h", "now")
interval := CalculateInterval(tr)
So(interval, ShouldEqual, "2s")
})
Convey("Round interval", func() {
So(roundInterval(time.Millisecond*30), ShouldEqual, time.Millisecond*20)
So(roundInterval(time.Millisecond*45), ShouldEqual, time.Millisecond*50)
})
Convey("Format value", func() {
So(formatDuration(time.Second*61), ShouldEqual, "1m")
So(formatDuration(time.Millisecond*30), ShouldEqual, "30ms")
So(formatDuration(time.Hour*23), ShouldEqual, "23h")
So(formatDuration(time.Hour*24*367), ShouldEqual, "1y")
})
})
}

View File

@@ -39,6 +39,7 @@ type DataSourceInfo struct {
BasicAuth bool
BasicAuthUser string
BasicAuthPassword string
JsonData *simplejson.Json
}
type BatchTiming struct {
@@ -51,6 +52,11 @@ type BatchResult struct {
Timings *BatchTiming
}
func (br *BatchResult) WithError(err error) *BatchResult {
br.Error = err
return br
}
type QueryResult struct {
Error error `json:"error"`
RefId string `json:"refId"`
@@ -72,15 +78,15 @@ func NewQueryResult() *QueryResult {
}
}
func NewTimePoint(value float64, timestamp float64) TimePoint {
return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)}
func NewTimePoint(value null.Float, timestamp float64) TimePoint {
return TimePoint{value, null.FloatFrom(timestamp)}
}
func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {
points := make(TimeSeriesPoints, 0)
for i := 0; i < len(values); i += 2 {
points = append(points, NewTimePoint(values[i], values[i+1]))
points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1]))
}
return points

View File

@@ -8,6 +8,8 @@ import (
"strings"
"time"
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/prometheus/client_golang/api/prometheus"
@@ -50,12 +52,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
client, err := e.getClient()
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
query, err := parseQuery(queries, queryContext)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
timeRange := prometheus.Range{
@@ -67,12 +69,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
value, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
queryResult, err := parseResponse(value, query)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
result.QueryResults = queryResult
return result
@@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
}
for _, k := range v.Values {
series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000)))
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
}
queryRes.Series = append(queryRes.Series, &series)
@@ -155,7 +157,8 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
return queryResults, nil
}
/*
func resultWithError(result *tsdb.BatchResult, err error) *tsdb.BatchResult {
result.Error = err
return result
}
}*/

View File

@@ -51,6 +51,8 @@ func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
go batch.process(ctx, context)
}
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

View File

@@ -6,6 +6,8 @@ import (
"strings"
"time"
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
)
@@ -42,7 +44,7 @@ func init() {
walker := rand.Float64() * 100
for i := int64(0); i < 10000 && timeWalkerMs < to; i++ {
points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs)))
points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs)))
walker += rand.Float64() - 0.5
timeWalkerMs += query.IntervalMs
@@ -73,7 +75,7 @@ func init() {
series := newSeriesForQuery(query)
outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000
series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime)))
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime)))
queryRes.Series = append(queryRes.Series, series)
return queryRes
@@ -88,10 +90,13 @@ func init() {
queryRes := tsdb.NewQueryResult()
stringInput := query.Model.Get("stringInput").MustString()
values := []float64{}
values := []null.Float{}
for _, strVal := range strings.Split(stringInput, ",") {
if strVal == "null" {
values = append(values, null.FloatFromPtr(nil))
}
if val, err := strconv.ParseFloat(strVal, 64); err == nil {
values = append(values, val)
values = append(values, null.FloatFrom(val))
}
}
@@ -105,7 +110,7 @@ func init() {
step := (endTime - startTime) / int64(len(values)-1)
for _, val := range values {
series.Points = append(series.Points, tsdb.NewTimePoint(val, float64(startTime)))
series.Points = append(series.Points, tsdb.TimePoint{val, null.FloatFrom(float64(startTime))})
startTime += step
}