diff --git a/pkg/tsdb/mysql/macros.go b/pkg/tsdb/mysql/macros.go index 584f731f3b8..078d1ff54f8 100644 --- a/pkg/tsdb/mysql/macros.go +++ b/pkg/tsdb/mysql/macros.go @@ -14,18 +14,18 @@ import ( const rsIdentifier = `([_a-zA-Z0-9]+)` const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)` -type MySqlMacroEngine struct { - TimeRange *tsdb.TimeRange - Query *tsdb.Query +type mySqlMacroEngine struct { + timeRange *tsdb.TimeRange + query *tsdb.Query } -func NewMysqlMacroEngine() tsdb.SqlMacroEngine { - return &MySqlMacroEngine{} +func newMysqlMacroEngine() tsdb.SqlMacroEngine { + return &mySqlMacroEngine{} } -func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { - m.TimeRange = timeRange - m.Query = query +func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) { + m.timeRange = timeRange + m.query = query rExp, _ := regexp.Compile(sExpr) var macroError error @@ -66,7 +66,7 @@ func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]str return result + str[lastIndex:] } -func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { +func (m *mySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) { switch name { case "__timeEpoch", "__time": if len(args) == 0 { @@ -78,11 +78,11 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er return "", fmt.Errorf("missing time column argument for macro %v", name) } - return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil case "__timeFrom": - return fmt.Sprintf("'%s'", m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("'%s'", m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil case "__timeTo": - return fmt.Sprintf("'%s'", m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil + return fmt.Sprintf("'%s'", m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil case "__timeGroup": if len(args) < 2 { return "", fmt.Errorf("macro %v needs time column and interval", name) @@ -92,16 +92,16 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er return "", fmt.Errorf("error parsing interval %v", args[1]) } if len(args) == 3 { - m.Query.Model.Set("fill", true) - m.Query.Model.Set("fillInterval", interval.Seconds()) + m.query.Model.Set("fill", true) + m.query.Model.Set("fillInterval", interval.Seconds()) if args[2] == "NULL" { - m.Query.Model.Set("fillNull", true) + m.query.Model.Set("fillNull", true) } else { floatVal, err := strconv.ParseFloat(args[2], 64) if err != nil { return "", fmt.Errorf("error parsing fill value %v", args[2]) } - m.Query.Model.Set("fillValue", floatVal) + m.query.Model.Set("fillValue", floatVal) } } return fmt.Sprintf("UNIX_TIMESTAMP(%s) DIV %.0f * %.0f", args[0], interval.Seconds(), interval.Seconds()), nil @@ -109,11 +109,11 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er if len(args) == 0 { return "", fmt.Errorf("missing time column argument for macro %v", name) } - return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.TimeRange.GetFromAsSecondsEpoch(), args[0], m.TimeRange.GetToAsSecondsEpoch()), nil + return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.timeRange.GetFromAsSecondsEpoch(), args[0], m.timeRange.GetToAsSecondsEpoch()), nil case "__unixEpochFrom": - return fmt.Sprintf("%d", m.TimeRange.GetFromAsSecondsEpoch()), nil + return fmt.Sprintf("%d", m.timeRange.GetFromAsSecondsEpoch()), nil case "__unixEpochTo": - return fmt.Sprintf("%d", m.TimeRange.GetToAsSecondsEpoch()), nil + return fmt.Sprintf("%d", m.timeRange.GetToAsSecondsEpoch()), nil default: return "", fmt.Errorf("Unknown macro %v", name) } diff --git a/pkg/tsdb/mysql/macros_test.go b/pkg/tsdb/mysql/macros_test.go index 2561661b385..003af9a737f 100644 --- a/pkg/tsdb/mysql/macros_test.go +++ b/pkg/tsdb/mysql/macros_test.go @@ -12,7 +12,7 @@ import ( func TestMacroEngine(t *testing.T) { Convey("MacroEngine", t, func() { - engine := &MySqlMacroEngine{} + engine := &mySqlMacroEngine{} query := &tsdb.Query{} Convey("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func() { diff --git a/pkg/tsdb/mysql/mysql.go b/pkg/tsdb/mysql/mysql.go index 7eceaffdb09..645f6b49bbb 100644 --- a/pkg/tsdb/mysql/mysql.go +++ b/pkg/tsdb/mysql/mysql.go @@ -1,39 +1,24 @@ package mysql import ( - "container/list" - "context" "database/sql" "fmt" - "math" "reflect" "strconv" "github.com/go-sql-driver/mysql" "github.com/go-xorm/core" - "github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" ) -type MysqlQueryEndpoint struct { - sqlEngine tsdb.SqlEngine - log log.Logger -} - func init() { - tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint) + tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint) } -func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { - endpoint := &MysqlQueryEndpoint{ - log: log.New("tsdb.mysql"), - } - - endpoint.sqlEngine = &tsdb.DefaultSqlEngine{ - MacroEngine: NewMysqlMacroEngine(), - } +func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { + logger := log.New("tsdb.mysql") cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true", datasource.User, @@ -42,85 +27,35 @@ func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoin datasource.Url, datasource.Database, ) - endpoint.log.Debug("getEngine", "connection", cnnstr) + logger.Debug("getEngine", "connection", cnnstr) - if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil { - return nil, err + config := tsdb.SqlQueryEndpointConfiguration{ + DriverName: "mysql", + ConnectionString: cnnstr, + Datasource: datasource, + TimeColumnNames: []string{"time", "time_sec"}, + MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"}, } - return endpoint, nil + rowTransformer := mysqlRowTransformer{ + log: logger, + } + + return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger) } -// Query is the main function for the MysqlExecutor -func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { - return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable) +type mysqlRowTransformer struct { + log log.Logger } -func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { - columnNames, err := rows.Columns() - columnCount := len(columnNames) - - if err != nil { - return err - } - - table := &tsdb.Table{ - Columns: make([]tsdb.TableColumn, columnCount), - Rows: make([]tsdb.RowValues, 0), - } - - for i, name := range columnNames { - table.Columns[i].Text = name - } - - rowLimit := 1000000 - rowCount := 0 - timeIndex := -1 - - // check if there is a column named time - for i, col := range columnNames { - switch col { - case "time", "time_sec": - timeIndex = i - } - } - - for ; rows.Next(); rowCount++ { - if rowCount > rowLimit { - return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit) - } - - values, err := e.getTypedRowData(rows) - if err != nil { - return err - } - - // converts column named time to unix timestamp in milliseconds to make - // native mysql datetime types and epoch dates work in - // annotation and table queries. - tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex) - - table.Rows = append(table.Rows, values) - } - - result.Tables = append(result.Tables, table) - result.Meta.Set("rowCount", rowCount) - return nil -} - -func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) { - types, err := rows.ColumnTypes() - if err != nil { - return nil, err - } - - values := make([]interface{}, len(types)) +func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) { + values := make([]interface{}, len(columnTypes)) for i := range values { - scanType := types[i].ScanType() + scanType := columnTypes[i].ScanType() values[i] = reflect.New(scanType).Interface() - if types[i].DatabaseTypeName() == "BIT" { + if columnTypes[i].DatabaseTypeName() == "BIT" { values[i] = new([]byte) } } @@ -129,7 +64,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er return nil, err } - for i := 0; i < len(types); i++ { + for i := 0; i < len(columnTypes); i++ { typeName := reflect.ValueOf(values[i]).Type().String() switch typeName { @@ -158,7 +93,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er } } - if types[i].DatabaseTypeName() == "DECIMAL" { + if columnTypes[i].DatabaseTypeName() == "DECIMAL" { f, err := strconv.ParseFloat(values[i].(string), 64) if err == nil { @@ -171,159 +106,3 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er return values, nil } - -func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error { - pointsBySeries := make(map[string]*tsdb.TimeSeries) - seriesByQueryOrder := list.New() - - columnNames, err := rows.Columns() - if err != nil { - return err - } - - columnTypes, err := rows.ColumnTypes() - if err != nil { - return err - } - - rowLimit := 1000000 - rowCount := 0 - timeIndex := -1 - metricIndex := -1 - - // check columns of resultset: a column named time is mandatory - // the first text column is treated as metric name unless a column named metric is present - for i, col := range columnNames { - switch col { - case "time", "time_sec": - timeIndex = i - case "metric": - metricIndex = i - default: - if metricIndex == -1 { - switch columnTypes[i].DatabaseTypeName() { - case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT": - metricIndex = i - } - } - } - } - - if timeIndex == -1 { - return fmt.Errorf("Found no column named time or time_sec") - } - - fillMissing := query.Model.Get("fill").MustBool(false) - var fillInterval float64 - fillValue := null.Float{} - if fillMissing { - fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 - if !query.Model.Get("fillNull").MustBool(false) { - fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() - fillValue.Valid = true - } - } - - for rows.Next() { - var timestamp float64 - var value null.Float - var metric string - - if rowCount > rowLimit { - return fmt.Errorf("PostgreSQL query row limit exceeded, limit %d", rowLimit) - } - - values, err := e.getTypedRowData(rows) - if err != nil { - return err - } - - // converts column named time to unix timestamp in milliseconds to make - // native mysql datetime types and epoch dates work in - // annotation and table queries. - tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex) - - switch columnValue := values[timeIndex].(type) { - case int64: - timestamp = float64(columnValue) - case float64: - timestamp = columnValue - default: - return fmt.Errorf("Invalid type for column time/time_sec, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue) - } - - if metricIndex >= 0 { - if columnValue, ok := values[metricIndex].(string); ok { - metric = columnValue - } else { - return fmt.Errorf("Column metric must be of type char,varchar or text, got: %T %v", values[metricIndex], values[metricIndex]) - } - } - - for i, col := range columnNames { - if i == timeIndex || i == metricIndex { - continue - } - - if value, err = tsdb.ConvertSqlValueColumnToFloat(col, values[i]); err != nil { - return err - } - - if metricIndex == -1 { - metric = col - } - - series, exist := pointsBySeries[metric] - if !exist { - series = &tsdb.TimeSeries{Name: metric} - pointsBySeries[metric] = series - seriesByQueryOrder.PushBack(metric) - } - - if fillMissing { - var intervalStart float64 - if !exist { - intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) - } else { - intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval - } - - // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - - for i := intervalStart; i < timestamp; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ - } - } - - series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) - - e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value) - rowCount++ - - } - } - - for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { - key := elem.Value.(string) - result.Series = append(result.Series, pointsBySeries[key]) - - if fillMissing { - series := pointsBySeries[key] - // fill in values from last fetched value till interval end - intervalStart := series.Points[len(series.Points)-1][1].Float64 - intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) - - // align interval start - intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval - for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { - series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) - rowCount++ - } - } - } - - result.Meta.Set("rowCount", rowCount) - return nil -} diff --git a/pkg/tsdb/mysql/mysql_test.go b/pkg/tsdb/mysql/mysql_test.go index 850a37617e2..3b4e283b726 100644 --- a/pkg/tsdb/mysql/mysql_test.go +++ b/pkg/tsdb/mysql/mysql_test.go @@ -8,8 +8,9 @@ import ( "time" "github.com/go-xorm/xorm" + "github.com/grafana/grafana/pkg/components/securejsondata" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil" "github.com/grafana/grafana/pkg/tsdb" @@ -21,8 +22,9 @@ import ( // The tests require a MySQL db named grafana_ds_tests and a user/password grafana/password // Use the docker/blocks/mysql_tests/docker-compose.yaml to spin up a // preconfigured MySQL server suitable for running these tests. -// There is also a dashboard.json in same directory that you can import to Grafana -// once you've created a datasource for the test server/database. +// There is also a datasource and dashboard provisioned by devenv scripts that you can +// use to verify that the generated data are vizualized as expected, see +// devenv/README.md for setup instructions. func TestMySQL(t *testing.T) { // change to true to run the MySQL tests runMySqlTests := false @@ -35,19 +37,25 @@ func TestMySQL(t *testing.T) { Convey("MySQL", t, func() { x := InitMySQLTestDB(t) - endpoint := &MysqlQueryEndpoint{ - sqlEngine: &tsdb.DefaultSqlEngine{ - MacroEngine: NewMysqlMacroEngine(), - XormEngine: x, - }, - log: log.New("tsdb.mysql"), + origXormEngine := tsdb.NewXormEngine + tsdb.NewXormEngine = func(d, c string) (*xorm.Engine, error) { + return x, nil } - sess := x.NewSession() - defer sess.Close() + endpoint, err := newMysqlQueryEndpoint(&models.DataSource{ + JsonData: simplejson.New(), + SecureJsonData: securejsondata.SecureJsonData{}, + }) + So(err, ShouldBeNil) + sess := x.NewSession() fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC) + Reset(func() { + sess.Close() + tsdb.NewXormEngine = origXormEngine + }) + Convey("Given a table with different native data types", func() { if exists, err := sess.IsTableExist("mysql_types"); err != nil || exists { So(err, ShouldBeNil)