diff --git a/pkg/tsdb/mssql/macros.go b/pkg/tsdb/mssql/macros.go index ad3d1edd5d7..2c16b5cb27f 100644 --- a/pkg/tsdb/mssql/macros.go +++ b/pkg/tsdb/mssql/macros.go @@ -14,18 +14,18 @@ import ( const rsIdentifier = `([_a-zA-Z0-9]+)` const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` -type MsSqlMacroEngine struct { - TimeRange *tsdb.TimeRange - Query *tsdb.Query +type msSqlMacroEngine struct { + timeRange *tsdb.TimeRange + query *tsdb.Query } -func NewMssqlMacroEngine() tsdb.SqlMacroEngine { - return &MsSqlMacroEngine{} +func newMssqlMacroEngine() tsdb.SqlMacroEngine { + return &msSqlMacroEngine{} } -func (m *MsSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { - m.TimeRange = timeRange - m.Query = query +func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { + m.timeRange = timeRange + m.query = query rExp, _ := regexp.Compile(sExpr) var macroError error @@ -66,7 +66,7 @@ func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]str return result + str[lastIndex:] } -func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { +func (m *msSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { switch name { case "__time": if len(args) == 0 { @@ -83,11 +83,11 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er return "", fmt.Errorf("missing time column argument for macro %v", name) } - return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil case "__timeFrom": - return fmt.Sprintf("'%s'", m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("'%s'", m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil case "__timeTo": - return fmt.Sprintf("'%s'", m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("'%s'", m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil case "__timeGroup": if len(args) < 2 { return "", fmt.Errorf("macro %v needs time column and interval", name) @@ -97,16 +97,16 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er return "", fmt.Errorf("error parsing interval %v", args[1]) } if len(args) == 3 { - m.Query.Model.Set("fill", true) - m.Query.Model.Set("fillInterval", interval.Seconds()) + m.query.Model.Set("fill", true) + m.query.Model.Set("fillInterval", interval.Seconds()) if args[2] == "NULL" { - m.Query.Model.Set("fillNull", true) + m.query.Model.Set("fillNull", true) } else { floatVal, err := strconv.ParseFloat(args[2], 64) if err != nil { return "", fmt.Errorf("error parsing fill value %v", args[2]) } - m.Query.Model.Set("fillValue", floatVal) + m.query.Model.Set("fillValue", floatVal) } } return fmt.Sprintf("FLOOR(DATEDIFF(second, '1970-01-01', %s)/%.0f)*%.0f", args[0], interval.Seconds(), interval.Seconds()), nil @@ -114,11 +114,11 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er if len(args) == 0 { return "", fmt.Errorf("missing time column argument for macro %v", name) } - return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.TimeRange.GetFromAsSecondsEpoch(), args[0], m.TimeRange.GetToAsSecondsEpoch()), nil + return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.timeRange.GetFromAsSecondsEpoch(), args[0], m.timeRange.GetToAsSecondsEpoch()), nil case "__unixEpochFrom": - return fmt.Sprintf("%d", m.TimeRange.GetFromAsSecondsEpoch()), nil + return fmt.Sprintf("%d", m.timeRange.GetFromAsSecondsEpoch()), nil case "__unixEpochTo": - return fmt.Sprintf("%d", m.TimeRange.GetToAsSecondsEpoch()), nil + return fmt.Sprintf("%d", m.timeRange.GetToAsSecondsEpoch()), nil default: return "", fmt.Errorf("Unknown macro %v", name) } diff --git a/pkg/tsdb/mssql/macros_test.go b/pkg/tsdb/mssql/macros_test.go index 49368fe3631..1895cd99442 100644 --- a/pkg/tsdb/mssql/macros_test.go +++ b/pkg/tsdb/mssql/macros_test.go @@ -14,7 +14,7 @@ import ( func TestMacroEngine(t *testing.T) { Convey("MacroEngine", t, func() { - engine := &MsSqlMacroEngine{} + engine := &msSqlMacroEngine{} query := &tsdb.Query{ Model: simplejson.New(), } diff --git a/pkg/tsdb/mssql/mssql.go b/pkg/tsdb/mssql/mssql.go index eb71259b46b..72e57d03fa0 100644 --- a/pkg/tsdb/mssql/mssql.go +++ b/pkg/tsdb/mssql/mssql.go @@ -1,49 +1,40 @@ package mssql import ( - "container/list" - "context" "database/sql" "fmt" "strconv" "strings" - "math" - _ "github.com/denisenkom/go-mssqldb" "github.com/go-xorm/core" - "github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" ) -type MssqlQueryEndpoint struct { - sqlEngine tsdb.SqlEngine - log log.Logger -} - func init() { - tsdb.RegisterTsdbQueryEndpoint("mssql", NewMssqlQueryEndpoint) + tsdb.RegisterTsdbQueryEndpoint("mssql", newMssqlQueryEndpoint) } -func NewMssqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { - endpoint := &MssqlQueryEndpoint{ - log: log.New("tsdb.mssql"), - } - - endpoint.sqlEngine = &tsdb.DefaultSqlEngine{ - MacroEngine: NewMssqlMacroEngine(), - } +func newMssqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { + logger := log.New("tsdb.mssql") cnnstr := generateConnectionString(datasource) - endpoint.log.Debug("getEngine", "connection", cnnstr) + logger.Debug("getEngine", "connection", cnnstr) - if err := endpoint.sqlEngine.InitEngine("mssql", datasource, cnnstr); err != nil { - return nil, err + config := tsdb.SqlQueryEndpointConfiguration{ + DriverName: "mssql", + ConnectionString: cnnstr, + Datasource: datasource, + MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"}, } - return endpoint, nil + rowTransformer := mssqlRowTransformer{ + log: logger, + } + + return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMssqlMacroEngine(), logger) } func generateConnectionString(datasource *models.DataSource) string { @@ -70,71 +61,16 @@ func generateConnectionString(datasource *models.DataSource) string { ) } -// Query is the main function for the MssqlQueryEndpoint -func (e *MssqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { - return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable) +type mssqlRowTransformer struct { + log log.Logger } -func (e MssqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { - columnNames, err := rows.Columns() - columnCount := len(columnNames) +func (t *mssqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) { + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) - if err != nil { - return err - } - - rowLimit := 1000000 - rowCount := 0 - timeIndex := -1 - - table := &tsdb.Table{ - Columns: make([]tsdb.TableColumn, columnCount), - Rows: make([]tsdb.RowValues, 0), - } - - for i, name := range columnNames { - table.Columns[i].Text = name - - // check if there is a column named time - switch name { - case "time": - timeIndex = i - } - } - - columnTypes, err := rows.ColumnTypes() - if err != nil { - return err - } - - for ; rows.Next(); rowCount++ { - if rowCount > rowLimit { - return fmt.Errorf("MsSQL query row limit exceeded, limit %d", rowLimit) - } - - values, err := e.getTypedRowData(columnTypes, rows) - if err != nil { - return err - } - - // converts column named time to unix timestamp in milliseconds - // to make native mssql datetime types and epoch dates work in - // annotation and table queries. - tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex) - table.Rows = append(table.Rows, values) - } - - result.Tables = append(result.Tables, table) - result.Meta.Set("rowCount", rowCount) - return nil -} - -func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) { - values := make([]interface{}, len(types)) - valuePtrs := make([]interface{}, len(types)) - - for i, stype := range types { - e.log.Debug("type", "type", stype) + for i, stype := range columnTypes { + t.log.Debug("type", "type", stype) valuePtrs[i] = &values[i] } @@ -144,17 +80,17 @@ func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core. // convert types not handled by denisenkom/go-mssqldb // unhandled types are returned as []byte - for i := 0; i < len(types); i++ { + for i := 0; i < len(columnTypes); i++ { if value, ok := values[i].([]byte); ok { - switch types[i].DatabaseTypeName() { + switch columnTypes[i].DatabaseTypeName() { case "MONEY", "SMALLMONEY", "DECIMAL": if v, err := strconv.ParseFloat(string(value), 64); err == nil { values[i] = v } else { - e.log.Debug("Rows", "Error converting numeric to float", value) + t.log.Debug("Rows", "Error converting numeric to float", value) } default: - e.log.Debug("Rows", "Unknown database type", types[i].DatabaseTypeName(), "value", value) + t.log.Debug("Rows", "Unknown database type", columnTypes[i].DatabaseTypeName(), "value", value) values[i] = string(value) } } @@ -162,157 +98,3 @@ func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core. return values, nil } - -func (e MssqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { - pointsBySeries := make(map[string]*tsdb.TimeSeries) - seriesByQueryOrder := list.New() - - columnNames, err := rows.Columns() - if err != nil { - return err - } - - columnTypes, err := rows.ColumnTypes() - if err != nil { - return err - } - - rowLimit := 1000000 - rowCount := 0 - timeIndex := -1 - metricIndex := -1 - - // check columns of resultset: a column named time is mandatory - // the first text column is treated as metric name unless a column named metric is present - for i, col := range columnNames { - switch col { - case "time": - timeIndex = i - case "metric": - metricIndex = i - default: - if metricIndex == -1 { - switch columnTypes[i].DatabaseTypeName() { - case "VARCHAR", "CHAR", "NVARCHAR", "NCHAR": - metricIndex = i - } - } - } - } - - if timeIndex == -1 { - return fmt.Errorf("Found no column named time") - } - - fillMissing := query.Model.Get("fill").MustBool(false) - var fillInterval float64 - fillValue := null.Float{} - if fillMissing { - fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 - if !query.Model.Get("fillNull").MustBool(false) { - fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() - fillValue.Valid = true - } - } - - for rows.Next() { - var timestamp float64 - var value null.Float - var metric string - - if rowCount > rowLimit { - return fmt.Errorf("MSSQL query row limit exceeded, limit %d", rowLimit) - } - - values, err := e.getTypedRowData(columnTypes, rows) - if err != nil { - return err - } - - // converts column named time to unix timestamp in milliseconds to make - // native mysql datetime types and epoch dates work in - // annotation and table queries. - tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex) - - switch columnValue := values[timeIndex].(type) { - case int64: - timestamp = float64(columnValue) - case float64: - timestamp = columnValue - default: - return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue) - } - - if metricIndex >= 0 { - if columnValue, ok := values[metricIndex].(string); ok { - metric = columnValue - } else { - return fmt.Errorf("Column metric must be of type CHAR, VARCHAR, NCHAR or NVARCHAR. metric column name: %s type: %s but datatype is %T", columnNames[metricIndex], columnTypes[metricIndex].DatabaseTypeName(), values[metricIndex]) - } - } - - for i, col := range columnNames { - if i == timeIndex || i == metricIndex { - continue - } - - if value, err = tsdb.ConvertSqlValueColumnToFloat(col, values[i]); err != nil { - return err - } - - if metricIndex == -1 { - metric = col - } - - series, exist := pointsBySeries[metric] - if !exist { - series = &tsdb.TimeSeries{Name: metric} - pointsBySeries[metric] = series - seriesByQueryOrder.PushBack(metric) - } - - if fillMissing { - var intervalStart float64 - if !exist { - intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) - } else { - intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval - } - - // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - - for i := intervalStart; i < timestamp; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ - } - } - - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - - e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) - } - } - - for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { - key := elem.Value.(string) - result.Series = append(result.Series, pointsBySeries[key]) - - if fillMissing { - series := pointsBySeries[key] - // fill in values from last fetched value till interval end - intervalStart := series.Points[len(series.Points)-1][1].Float64 - intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) - - // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ - } - } - } - - result.Meta.Set("rowCount", rowCount) - return nil -} diff --git a/pkg/tsdb/mssql/mssql_test.go b/pkg/tsdb/mssql/mssql_test.go index db04d6d1f02..86484cb9d5e 100644 --- a/pkg/tsdb/mssql/mssql_test.go +++ b/pkg/tsdb/mssql/mssql_test.go @@ -8,8 +8,9 @@ import ( "time" "github.com/go-xorm/xorm" + "github.com/grafana/grafana/pkg/components/securejsondata" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil" "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" @@ -19,8 +20,9 @@ import ( // The tests require a MSSQL db named grafanatest and a user/password grafana/Password! // Use the docker/blocks/mssql_tests/docker-compose.yaml to spin up a // preconfigured MSSQL server suitable for running these tests. -// There is also a dashboard.json in same directory that you can import to Grafana -// once you've created a datasource for the test server/database. +// There is also a datasource and dashboard provisioned by devenv scripts that you can +// use to verify that the generated data are vizualized as expected, see +// devenv/README.md for setup instructions. // If needed, change the variable below to the IP address of the database. var serverIP = "localhost" @@ -28,19 +30,25 @@ func TestMSSQL(t *testing.T) { SkipConvey("MSSQL", t, func() { x := InitMSSQLTestDB(t) - endpoint := &MssqlQueryEndpoint{ - sqlEngine: &tsdb.DefaultSqlEngine{ - MacroEngine: NewMssqlMacroEngine(), - XormEngine: x, - }, - log: log.New("tsdb.mssql"), + origXormEngine := tsdb.NewXormEngine + tsdb.NewXormEngine = func(d, c string) (*xorm.Engine, error) { + return x, nil } - sess := x.NewSession() - defer sess.Close() + endpoint, err := newMssqlQueryEndpoint(&models.DataSource{ + JsonData: simplejson.New(), + SecureJsonData: securejsondata.SecureJsonData{}, + }) + So(err, ShouldBeNil) + sess := x.NewSession() fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local) + Reset(func() { + sess.Close() + tsdb.NewXormEngine = origXormEngine + }) + Convey("Given a table with different native data types", func() { sql := ` IF OBJECT_ID('dbo.[mssql_types]', 'U') IS NOT NULL