2017-10-10 08:19:14 -05:00
package tsdb
import (
2018-07-26 11:09:42 -05:00
"container/list"
2017-10-10 08:19:14 -05:00
"context"
2018-07-26 11:09:42 -05:00
"database/sql"
2018-04-24 12:50:14 -05:00
"fmt"
2018-07-26 11:09:42 -05:00
"math"
2018-09-13 09:51:00 -05:00
"regexp"
2018-08-12 03:51:58 -05:00
"strconv"
2018-07-26 11:09:42 -05:00
"strings"
2017-10-10 08:19:14 -05:00
"sync"
2018-03-20 13:40:10 -05:00
"time"
2017-10-10 08:19:14 -05:00
2019-05-13 01:45:54 -05:00
"github.com/grafana/grafana/pkg/infra/log"
2018-07-26 11:09:42 -05:00
2018-04-24 12:50:14 -05:00
"github.com/grafana/grafana/pkg/components/null"
2017-10-10 08:19:14 -05:00
"github.com/go-xorm/core"
"github.com/go-xorm/xorm"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
)
2017-12-08 16:04:17 -06:00
// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
// timeRange to be able to generate queries that use from and to.
2017-10-10 08:19:14 -05:00
type SqlMacroEngine interface {
2017-12-08 16:04:17 -06:00
Interpolate ( query * Query , timeRange * TimeRange , sql string ) ( string , error )
2017-10-10 08:19:14 -05:00
}
2018-07-26 11:09:42 -05:00
// SqlTableRowTransformer transforms a query result row to RowValues with proper types.
type SqlTableRowTransformer interface {
Transform ( columnTypes [ ] * sql . ColumnType , rows * core . Rows ) ( RowValues , error )
2017-10-10 08:19:14 -05:00
}
type engineCacheType struct {
cache map [ int64 ] * xorm . Engine
versions map [ int64 ] int
sync . Mutex
}
var engineCache = engineCacheType {
cache : make ( map [ int64 ] * xorm . Engine ) ,
versions : make ( map [ int64 ] int ) ,
}
2018-09-13 09:51:00 -05:00
var sqlIntervalCalculator = NewIntervalCalculator ( nil )
2018-07-26 11:09:42 -05:00
var NewXormEngine = func ( driverName string , connectionString string ) ( * xorm . Engine , error ) {
return xorm . NewEngine ( driverName , connectionString )
}
type sqlQueryEndpoint struct {
macroEngine SqlMacroEngine
rowTransformer SqlTableRowTransformer
engine * xorm . Engine
timeColumnNames [ ] string
metricColumnTypes [ ] string
log log . Logger
}
type SqlQueryEndpointConfiguration struct {
DriverName string
Datasource * models . DataSource
ConnectionString string
TimeColumnNames [ ] string
MetricColumnTypes [ ] string
}
var NewSqlQueryEndpoint = func ( config * SqlQueryEndpointConfiguration , rowTransformer SqlTableRowTransformer , macroEngine SqlMacroEngine , log log . Logger ) ( TsdbQueryEndpoint , error ) {
queryEndpoint := sqlQueryEndpoint {
rowTransformer : rowTransformer ,
macroEngine : macroEngine ,
timeColumnNames : [ ] string { "time" } ,
log : log ,
}
if len ( config . TimeColumnNames ) > 0 {
queryEndpoint . timeColumnNames = config . TimeColumnNames
}
2018-07-30 06:50:18 -05:00
if len ( config . MetricColumnTypes ) > 0 {
queryEndpoint . metricColumnTypes = config . MetricColumnTypes
}
2017-10-10 08:19:14 -05:00
engineCache . Lock ( )
defer engineCache . Unlock ( )
2018-07-26 11:09:42 -05:00
if engine , present := engineCache . cache [ config . Datasource . Id ] ; present {
if version := engineCache . versions [ config . Datasource . Id ] ; version == config . Datasource . Version {
queryEndpoint . engine = engine
return & queryEndpoint , nil
2017-10-10 08:19:14 -05:00
}
}
2018-07-26 11:09:42 -05:00
engine , err := NewXormEngine ( config . DriverName , config . ConnectionString )
2017-10-10 08:19:14 -05:00
if err != nil {
2018-07-26 11:09:42 -05:00
return nil , err
2017-10-10 08:19:14 -05:00
}
2018-09-24 07:35:13 -05:00
maxOpenConns := config . Datasource . JsonData . Get ( "maxOpenConns" ) . MustInt ( 0 )
engine . SetMaxOpenConns ( maxOpenConns )
maxIdleConns := config . Datasource . JsonData . Get ( "maxIdleConns" ) . MustInt ( 2 )
engine . SetMaxIdleConns ( maxIdleConns )
connMaxLifetime := config . Datasource . JsonData . Get ( "connMaxLifetime" ) . MustInt ( 14400 )
engine . SetConnMaxLifetime ( time . Duration ( connMaxLifetime ) * time . Second )
2017-11-15 04:07:35 -06:00
2018-07-26 11:09:42 -05:00
engineCache . versions [ config . Datasource . Id ] = config . Datasource . Version
engineCache . cache [ config . Datasource . Id ] = engine
queryEndpoint . engine = engine
2017-10-10 08:19:14 -05:00
2018-07-26 11:09:42 -05:00
return & queryEndpoint , nil
2017-10-10 08:19:14 -05:00
}
2018-07-27 06:21:40 -05:00
const rowLimit = 1000000
2018-07-26 11:09:42 -05:00
// Query is the main function for the SqlQueryEndpoint
func ( e * sqlQueryEndpoint ) Query ( ctx context . Context , dsInfo * models . DataSource , tsdbQuery * TsdbQuery ) ( * Response , error ) {
2017-10-10 08:19:14 -05:00
result := & Response {
Results : make ( map [ string ] * QueryResult ) ,
}
2018-09-24 07:33:45 -05:00
var wg sync . WaitGroup
2017-10-10 08:19:14 -05:00
for _ , query := range tsdbQuery . Queries {
2018-07-26 11:09:42 -05:00
rawSQL := query . Model . Get ( "rawSql" ) . MustString ( )
if rawSQL == "" {
2017-10-10 08:19:14 -05:00
continue
}
queryResult := & QueryResult { Meta : simplejson . New ( ) , RefId : query . RefId }
result . Results [ query . RefId ] = queryResult
2018-09-13 09:51:00 -05:00
// global substitutions
rawSQL , err := Interpolate ( query , tsdbQuery . TimeRange , rawSQL )
if err != nil {
queryResult . Error = err
continue
}
// datasource specific substitutions
rawSQL , err = e . macroEngine . Interpolate ( query , tsdbQuery . TimeRange , rawSQL )
2017-10-10 08:19:14 -05:00
if err != nil {
queryResult . Error = err
continue
}
2018-07-26 11:09:42 -05:00
queryResult . Meta . Set ( "sql" , rawSQL )
2017-10-10 08:19:14 -05:00
2018-09-24 07:33:45 -05:00
wg . Add ( 1 )
2017-10-10 08:19:14 -05:00
2018-09-24 07:33:45 -05:00
go func ( rawSQL string , query * Query , queryResult * QueryResult ) {
defer wg . Done ( )
session := e . engine . NewSession ( )
defer session . Close ( )
db := session . DB ( )
2017-10-10 08:19:14 -05:00
2018-09-24 07:33:45 -05:00
rows , err := db . Query ( rawSQL )
2017-10-10 08:19:14 -05:00
if err != nil {
queryResult . Error = err
2018-09-24 07:33:45 -05:00
return
2017-10-10 08:19:14 -05:00
}
2018-09-24 07:33:45 -05:00
defer rows . Close ( )
format := query . Model . Get ( "format" ) . MustString ( "time_series" )
switch format {
case "time_series" :
err := e . transformToTimeSeries ( query , rows , queryResult , tsdbQuery )
if err != nil {
queryResult . Error = err
return
}
case "table" :
err := e . transformToTable ( query , rows , queryResult , tsdbQuery )
if err != nil {
queryResult . Error = err
return
}
2017-10-10 08:19:14 -05:00
}
2018-09-24 07:33:45 -05:00
} ( rawSQL , query , queryResult )
2017-10-10 08:19:14 -05:00
}
2018-09-24 07:33:45 -05:00
wg . Wait ( )
2017-10-10 08:19:14 -05:00
return result , nil
}
2018-03-20 13:40:10 -05:00
2018-09-13 09:51:00 -05:00
// global macros/substitutions for all sql datasources
var Interpolate = func ( query * Query , timeRange * TimeRange , sql string ) ( string , error ) {
minInterval , err := GetIntervalFrom ( query . DataSource , query . Model , time . Second * 60 )
if err != nil {
return sql , nil
}
interval := sqlIntervalCalculator . Calculate ( timeRange , minInterval )
sql = strings . Replace ( sql , "$__interval_ms" , strconv . FormatInt ( interval . Milliseconds ( ) , 10 ) , - 1 )
sql = strings . Replace ( sql , "$__interval" , interval . Text , - 1 )
2018-10-02 13:19:34 -05:00
sql = strings . Replace ( sql , "$__unixEpochFrom()" , fmt . Sprintf ( "%d" , timeRange . GetFromAsSecondsEpoch ( ) ) , - 1 )
sql = strings . Replace ( sql , "$__unixEpochTo()" , fmt . Sprintf ( "%d" , timeRange . GetToAsSecondsEpoch ( ) ) , - 1 )
2018-09-13 09:51:00 -05:00
return sql , nil
}
2018-07-26 11:09:42 -05:00
func ( e * sqlQueryEndpoint ) transformToTable ( query * Query , rows * core . Rows , result * QueryResult , tsdbQuery * TsdbQuery ) error {
columnNames , err := rows . Columns ( )
columnCount := len ( columnNames )
if err != nil {
return err
}
rowCount := 0
timeIndex := - 1
table := & Table {
Columns : make ( [ ] TableColumn , columnCount ) ,
Rows : make ( [ ] RowValues , 0 ) ,
}
for i , name := range columnNames {
table . Columns [ i ] . Text = name
for _ , tc := range e . timeColumnNames {
if name == tc {
timeIndex = i
break
}
}
}
columnTypes , err := rows . ColumnTypes ( )
if err != nil {
return err
}
for ; rows . Next ( ) ; rowCount ++ {
if rowCount > rowLimit {
return fmt . Errorf ( "query row limit exceeded, limit %d" , rowLimit )
}
values , err := e . rowTransformer . Transform ( columnTypes , rows )
if err != nil {
return err
}
// converts column named time to unix timestamp in milliseconds
// to make native mssql datetime types and epoch dates work in
// annotation and table queries.
ConvertSqlTimeColumnToEpochMs ( values , timeIndex )
table . Rows = append ( table . Rows , values )
}
result . Tables = append ( result . Tables , table )
result . Meta . Set ( "rowCount" , rowCount )
return nil
}
func ( e * sqlQueryEndpoint ) transformToTimeSeries ( query * Query , rows * core . Rows , result * QueryResult , tsdbQuery * TsdbQuery ) error {
pointsBySeries := make ( map [ string ] * TimeSeries )
seriesByQueryOrder := list . New ( )
columnNames , err := rows . Columns ( )
if err != nil {
return err
}
columnTypes , err := rows . ColumnTypes ( )
if err != nil {
return err
}
rowCount := 0
timeIndex := - 1
metricIndex := - 1
2018-07-24 12:25:48 -05:00
metricPrefix := false
var metricPrefixValue string
2018-07-26 11:09:42 -05:00
// check columns of resultset: a column named time is mandatory
// the first text column is treated as metric name unless a column named metric is present
for i , col := range columnNames {
for _ , tc := range e . timeColumnNames {
if col == tc {
timeIndex = i
continue
}
}
switch col {
case "metric" :
metricIndex = i
default :
if metricIndex == - 1 {
columnType := columnTypes [ i ] . DatabaseTypeName ( )
for _ , mct := range e . metricColumnTypes {
if columnType == mct {
metricIndex = i
continue
}
}
}
}
}
2018-07-24 12:25:48 -05:00
// use metric column as prefix with multiple value columns
if metricIndex != - 1 && len ( columnNames ) > 3 {
metricPrefix = true
}
2018-07-26 11:09:42 -05:00
if timeIndex == - 1 {
return fmt . Errorf ( "Found no column named %s" , strings . Join ( e . timeColumnNames , " or " ) )
}
fillMissing := query . Model . Get ( "fill" ) . MustBool ( false )
var fillInterval float64
fillValue := null . Float { }
2018-08-07 14:01:41 -05:00
fillPrevious := false
2018-07-30 04:04:04 -05:00
2018-07-26 11:09:42 -05:00
if fillMissing {
fillInterval = query . Model . Get ( "fillInterval" ) . MustFloat64 ( ) * 1000
2018-07-30 04:04:04 -05:00
switch query . Model . Get ( "fillMode" ) . MustString ( ) {
case "null" :
2018-08-07 14:01:41 -05:00
case "previous" :
fillPrevious = true
2018-07-30 04:04:04 -05:00
case "value" :
2018-07-26 11:09:42 -05:00
fillValue . Float64 = query . Model . Get ( "fillValue" ) . MustFloat64 ( )
fillValue . Valid = true
}
}
for rows . Next ( ) {
var timestamp float64
var value null . Float
var metric string
if rowCount > rowLimit {
return fmt . Errorf ( "query row limit exceeded, limit %d" , rowLimit )
}
values , err := e . rowTransformer . Transform ( columnTypes , rows )
if err != nil {
return err
}
// converts column named time to unix timestamp in milliseconds to make
// native mysql datetime types and epoch dates work in
// annotation and table queries.
ConvertSqlTimeColumnToEpochMs ( values , timeIndex )
switch columnValue := values [ timeIndex ] . ( type ) {
case int64 :
timestamp = float64 ( columnValue )
case float64 :
timestamp = columnValue
default :
return fmt . Errorf ( "Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v" , columnValue , columnValue )
}
if metricIndex >= 0 {
if columnValue , ok := values [ metricIndex ] . ( string ) ; ok {
2018-07-24 12:25:48 -05:00
if metricPrefix {
metricPrefixValue = columnValue
} else {
metric = columnValue
}
2018-07-26 11:09:42 -05:00
} else {
return fmt . Errorf ( "Column metric must be of type %s. metric column name: %s type: %s but datatype is %T" , strings . Join ( e . metricColumnTypes , ", " ) , columnNames [ metricIndex ] , columnTypes [ metricIndex ] . DatabaseTypeName ( ) , values [ metricIndex ] )
}
}
for i , col := range columnNames {
if i == timeIndex || i == metricIndex {
continue
}
if value , err = ConvertSqlValueColumnToFloat ( col , values [ i ] ) ; err != nil {
return err
}
if metricIndex == - 1 {
metric = col
2018-07-24 12:25:48 -05:00
} else if metricPrefix {
metric = metricPrefixValue + " " + col
2018-07-26 11:09:42 -05:00
}
series , exist := pointsBySeries [ metric ]
if ! exist {
series = & TimeSeries { Name : metric }
pointsBySeries [ metric ] = series
seriesByQueryOrder . PushBack ( metric )
}
if fillMissing {
var intervalStart float64
if ! exist {
intervalStart = float64 ( tsdbQuery . TimeRange . MustGetFrom ( ) . UnixNano ( ) / 1e6 )
} else {
intervalStart = series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64 + fillInterval
}
2018-08-07 14:01:41 -05:00
if fillPrevious {
2018-07-30 04:04:04 -05:00
if len ( series . Points ) > 0 {
fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
} else {
fillValue . Valid = false
}
}
2018-07-26 11:09:42 -05:00
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
for i := intervalStart ; i < timestamp ; i += fillInterval {
series . Points = append ( series . Points , TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
}
}
series . Points = append ( series . Points , TimePoint { value , null . FloatFrom ( timestamp ) } )
e . log . Debug ( "Rows" , "metric" , metric , "time" , timestamp , "value" , value )
}
}
for elem := seriesByQueryOrder . Front ( ) ; elem != nil ; elem = elem . Next ( ) {
key := elem . Value . ( string )
result . Series = append ( result . Series , pointsBySeries [ key ] )
if fillMissing {
series := pointsBySeries [ key ]
// fill in values from last fetched value till interval end
intervalStart := series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64
intervalEnd := float64 ( tsdbQuery . TimeRange . MustGetTo ( ) . UnixNano ( ) / 1e6 )
2018-08-07 14:01:41 -05:00
if fillPrevious {
2018-07-30 04:04:04 -05:00
if len ( series . Points ) > 0 {
fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
} else {
fillValue . Valid = false
}
}
2018-07-26 11:09:42 -05:00
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
for i := intervalStart + fillInterval ; i < intervalEnd ; i += fillInterval {
series . Points = append ( series . Points , TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
}
}
}
result . Meta . Set ( "rowCount" , rowCount )
return nil
}
2018-04-10 03:32:30 -05:00
// ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
2018-03-20 13:40:10 -05:00
// to make native datetime types and epoch dates work in annotation and table queries.
func ConvertSqlTimeColumnToEpochMs ( values RowValues , timeIndex int ) {
2018-05-02 07:06:46 -05:00
if timeIndex >= 0 {
switch value := values [ timeIndex ] . ( type ) {
case time . Time :
2018-05-28 09:57:51 -05:00
values [ timeIndex ] = float64 ( value . UnixNano ( ) ) / float64 ( time . Millisecond )
2018-05-02 07:06:46 -05:00
case * time . Time :
if value != nil {
2018-05-28 09:57:51 -05:00
values [ timeIndex ] = float64 ( ( * value ) . UnixNano ( ) ) / float64 ( time . Millisecond )
2018-05-02 07:06:46 -05:00
}
case int64 :
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( value ) ) )
case * int64 :
if value != nil {
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( * value ) ) )
}
case uint64 :
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( value ) ) )
case * uint64 :
if value != nil {
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( * value ) ) )
}
case int32 :
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( value ) ) )
case * int32 :
if value != nil {
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( * value ) ) )
}
case uint32 :
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( value ) ) )
case * uint32 :
if value != nil {
values [ timeIndex ] = int64 ( EpochPrecisionToMs ( float64 ( * value ) ) )
}
case float64 :
values [ timeIndex ] = EpochPrecisionToMs ( value )
case * float64 :
if value != nil {
values [ timeIndex ] = EpochPrecisionToMs ( * value )
}
case float32 :
values [ timeIndex ] = EpochPrecisionToMs ( float64 ( value ) )
case * float32 :
if value != nil {
values [ timeIndex ] = EpochPrecisionToMs ( float64 ( * value ) )
}
2018-03-20 13:40:10 -05:00
}
}
}
2018-04-24 12:50:14 -05:00
// ConvertSqlValueColumnToFloat converts timeseries value column to float.
func ConvertSqlValueColumnToFloat ( columnName string , columnValue interface { } ) ( null . Float , error ) {
var value null . Float
switch typedValue := columnValue . ( type ) {
case int :
value = null . FloatFrom ( float64 ( typedValue ) )
case * int :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case int64 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * int64 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case int32 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * int32 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case int16 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * int16 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case int8 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * int8 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case uint :
value = null . FloatFrom ( float64 ( typedValue ) )
case * uint :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case uint64 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * uint64 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case uint32 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * uint32 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case uint16 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * uint16 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case uint8 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * uint8 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case float64 :
value = null . FloatFrom ( typedValue )
case * float64 :
value = null . FloatFromPtr ( typedValue )
case float32 :
value = null . FloatFrom ( float64 ( typedValue ) )
case * float32 :
if typedValue == nil {
value . Valid = false
} else {
value = null . FloatFrom ( float64 ( * typedValue ) )
}
case nil :
value . Valid = false
default :
return null . NewFloat ( 0 , false ) , fmt . Errorf ( "Value column must have numeric datatype, column: %s type: %T value: %v" , columnName , typedValue , typedValue )
}
return value , nil
}
2018-08-12 03:51:58 -05:00
func SetupFillmode ( query * Query , interval time . Duration , fillmode string ) error {
query . Model . Set ( "fill" , true )
query . Model . Set ( "fillInterval" , interval . Seconds ( ) )
switch fillmode {
case "NULL" :
query . Model . Set ( "fillMode" , "null" )
case "previous" :
query . Model . Set ( "fillMode" , "previous" )
default :
query . Model . Set ( "fillMode" , "value" )
floatVal , err := strconv . ParseFloat ( fillmode , 64 )
if err != nil {
return fmt . Errorf ( "error parsing fill value %v" , fillmode )
}
query . Model . Set ( "fillValue" , floatVal )
}
return nil
}
2018-09-13 09:51:00 -05:00
type SqlMacroEngineBase struct { }
func NewSqlMacroEngineBase ( ) * SqlMacroEngineBase {
return & SqlMacroEngineBase { }
}
func ( m * SqlMacroEngineBase ) ReplaceAllStringSubmatchFunc ( re * regexp . Regexp , str string , repl func ( [ ] string ) string ) string {
result := ""
lastIndex := 0
for _ , v := range re . FindAllSubmatchIndex ( [ ] byte ( str ) , - 1 ) {
groups := [ ] string { }
for i := 0 ; i < len ( v ) ; i += 2 {
groups = append ( groups , str [ v [ i ] : v [ i + 1 ] ] )
}
result += str [ lastIndex : v [ 0 ] ] + repl ( groups )
lastIndex = v [ 1 ]
}
return result + str [ lastIndex : ]
}