2017-12-02 05:40:12 -06:00
package mssql
import (
"container/list"
"context"
"database/sql"
"fmt"
2018-03-16 08:37:16 -05:00
"strconv"
2017-12-02 05:40:12 -06:00
"strings"
2018-03-16 08:37:16 -05:00
"math"
2017-12-02 05:40:12 -06:00
_ "github.com/denisenkom/go-mssqldb"
"github.com/go-xorm/core"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
type MssqlQueryEndpoint struct {
sqlEngine tsdb . SqlEngine
log log . Logger
}
func init ( ) {
tsdb . RegisterTsdbQueryEndpoint ( "mssql" , NewMssqlQueryEndpoint )
}
func NewMssqlQueryEndpoint ( datasource * models . DataSource ) ( tsdb . TsdbQueryEndpoint , error ) {
endpoint := & MssqlQueryEndpoint {
log : log . New ( "tsdb.mssql" ) ,
}
endpoint . sqlEngine = & tsdb . DefaultSqlEngine {
MacroEngine : NewMssqlMacroEngine ( ) ,
}
2018-03-19 11:32:51 -05:00
cnnstr := generateConnectionString ( datasource )
endpoint . log . Debug ( "getEngine" , "connection" , cnnstr )
if err := endpoint . sqlEngine . InitEngine ( "mssql" , datasource , cnnstr ) ; err != nil {
return nil , err
}
return endpoint , nil
}
func generateConnectionString ( datasource * models . DataSource ) string {
password := ""
for key , value := range datasource . SecureJsonData . Decrypt ( ) {
if key == "password" {
password = value
break
}
}
2018-03-19 07:24:31 -05:00
hostParts := strings . Split ( datasource . Url , ":" )
if len ( hostParts ) < 2 {
hostParts = append ( hostParts , "1433" )
}
server , port := hostParts [ 0 ] , hostParts [ 1 ]
2018-03-19 11:32:51 -05:00
return fmt . Sprintf ( "server=%s;port=%s;database=%s;user id=%s;password=%s;" ,
2017-12-02 05:40:12 -06:00
server ,
port ,
datasource . Database ,
datasource . User ,
2018-03-19 11:32:51 -05:00
password ,
2017-12-02 05:40:12 -06:00
)
}
2018-03-19 07:32:04 -05:00
// Query is the main function for the MssqlQueryEndpoint
2017-12-02 05:40:12 -06:00
func ( e * MssqlQueryEndpoint ) Query ( ctx context . Context , dsInfo * models . DataSource , tsdbQuery * tsdb . TsdbQuery ) ( * tsdb . Response , error ) {
return e . sqlEngine . Query ( ctx , dsInfo , tsdbQuery , e . transformToTimeSeries , e . transformToTable )
}
2018-03-13 10:03:02 -05:00
func ( e MssqlQueryEndpoint ) transformToTable ( query * tsdb . Query , rows * core . Rows , result * tsdb . QueryResult , tsdbQuery * tsdb . TsdbQuery ) error {
2017-12-02 05:40:12 -06:00
columnNames , err := rows . Columns ( )
columnCount := len ( columnNames )
if err != nil {
return err
}
2017-12-12 14:43:24 -06:00
rowLimit := 1000000
rowCount := 0
timeIndex := - 1
2017-12-02 05:40:12 -06:00
table := & tsdb . Table {
Columns : make ( [ ] tsdb . TableColumn , columnCount ) ,
Rows : make ( [ ] tsdb . RowValues , 0 ) ,
}
for i , name := range columnNames {
table . Columns [ i ] . Text = name
2017-12-12 14:43:24 -06:00
// check if there is a column named time
switch name {
case "time" :
timeIndex = i
}
2017-12-02 05:40:12 -06:00
}
columnTypes , err := rows . ColumnTypes ( )
if err != nil {
return err
}
for ; rows . Next ( ) ; rowCount ++ {
if rowCount > rowLimit {
return fmt . Errorf ( "MsSQL query row limit exceeded, limit %d" , rowLimit )
}
values , err := e . getTypedRowData ( columnTypes , rows )
if err != nil {
return err
}
2018-03-22 09:23:12 -05:00
// converts column named time to unix timestamp in milliseconds
// to make native mssql datetime types and epoch dates work in
// annotation and table queries.
tsdb . ConvertSqlTimeColumnToEpochMs ( values , timeIndex )
2017-12-02 05:40:12 -06:00
table . Rows = append ( table . Rows , values )
}
result . Tables = append ( result . Tables , table )
result . Meta . Set ( "rowCount" , rowCount )
return nil
}
func ( e MssqlQueryEndpoint ) getTypedRowData ( types [ ] * sql . ColumnType , rows * core . Rows ) ( tsdb . RowValues , error ) {
values := make ( [ ] interface { } , len ( types ) )
2017-12-06 01:32:20 -06:00
valuePtrs := make ( [ ] interface { } , len ( types ) )
2017-12-02 05:40:12 -06:00
for i , stype := range types {
e . log . Debug ( "type" , "type" , stype )
2017-12-06 01:32:20 -06:00
valuePtrs [ i ] = & values [ i ]
2017-12-02 05:40:12 -06:00
}
2017-12-06 01:32:20 -06:00
if err := rows . Scan ( valuePtrs ... ) ; err != nil {
return nil , err
}
2017-12-02 05:40:12 -06:00
2018-03-16 08:37:16 -05:00
// convert types not handled by denisenkom/go-mssqldb
// unhandled types are returned as []byte
for i := 0 ; i < len ( types ) ; i ++ {
if value , ok := values [ i ] . ( [ ] byte ) ; ok == true {
switch types [ i ] . DatabaseTypeName ( ) {
case "MONEY" , "SMALLMONEY" , "DECIMAL" :
if v , err := strconv . ParseFloat ( string ( value ) , 64 ) ; err == nil {
values [ i ] = v
} else {
e . log . Debug ( "Rows" , "Error converting numeric to float" , value )
}
default :
e . log . Debug ( "Rows" , "Unknown database type" , types [ i ] . DatabaseTypeName ( ) , "value" , value )
values [ i ] = string ( value )
}
}
}
2017-12-02 05:40:12 -06:00
return values , nil
}
2018-03-13 10:03:02 -05:00
func ( e MssqlQueryEndpoint ) transformToTimeSeries ( query * tsdb . Query , rows * core . Rows , result * tsdb . QueryResult , tsdbQuery * tsdb . TsdbQuery ) error {
2017-12-02 05:40:12 -06:00
pointsBySeries := make ( map [ string ] * tsdb . TimeSeries )
seriesByQueryOrder := list . New ( )
2017-12-12 14:43:24 -06:00
2017-12-02 05:40:12 -06:00
columnNames , err := rows . Columns ( )
2017-12-12 14:43:24 -06:00
if err != nil {
return err
}
2017-12-02 05:40:12 -06:00
2017-12-12 14:43:24 -06:00
columnTypes , err := rows . ColumnTypes ( )
2017-12-02 05:40:12 -06:00
if err != nil {
return err
}
2017-12-12 14:43:24 -06:00
rowLimit := 1000000
2017-12-02 05:40:12 -06:00
rowCount := 0
2017-12-12 14:43:24 -06:00
timeIndex := - 1
metricIndex := - 1
// check columns of resultset: a column named time is mandatory
// the first text column is treated as metric name unless a column named metric is present
for i , col := range columnNames {
switch col {
case "time" :
timeIndex = i
case "metric" :
metricIndex = i
default :
if metricIndex == - 1 {
switch columnTypes [ i ] . DatabaseTypeName ( ) {
case "VARCHAR" , "CHAR" , "NVARCHAR" , "NCHAR" :
metricIndex = i
}
}
}
}
if timeIndex == - 1 {
return fmt . Errorf ( "Found no column named time" )
}
2018-03-15 09:06:54 -05:00
fillMissing := query . Model . Get ( "fill" ) . MustBool ( false )
var fillInterval float64
fillValue := null . Float { }
if fillMissing {
fillInterval = query . Model . Get ( "fillInterval" ) . MustFloat64 ( ) * 1000
if query . Model . Get ( "fillNull" ) . MustBool ( false ) == false {
fillValue . Float64 = query . Model . Get ( "fillValue" ) . MustFloat64 ( )
fillValue . Valid = true
}
}
2017-12-12 14:43:24 -06:00
for rows . Next ( ) {
var timestamp float64
var value null . Float
var metric string
2017-12-02 05:40:12 -06:00
if rowCount > rowLimit {
2017-12-12 14:43:24 -06:00
return fmt . Errorf ( "MSSQL query row limit exceeded, limit %d" , rowLimit )
2017-12-02 05:40:12 -06:00
}
2017-12-12 14:43:24 -06:00
values , err := e . getTypedRowData ( columnTypes , rows )
2017-12-02 05:40:12 -06:00
if err != nil {
2017-12-12 14:43:24 -06:00
return err
2017-12-02 05:40:12 -06:00
}
2018-04-10 04:10:56 -05:00
// converts column named time to unix timestamp in milliseconds to make
// native mysql datetime types and epoch dates work in
// annotation and table queries.
tsdb . ConvertSqlTimeColumnToEpochMs ( values , timeIndex )
2017-12-12 14:43:24 -06:00
switch columnValue := values [ timeIndex ] . ( type ) {
case int64 :
2018-04-10 04:10:56 -05:00
timestamp = float64 ( columnValue )
2017-12-12 14:43:24 -06:00
case float64 :
2018-04-10 04:10:56 -05:00
timestamp = columnValue
2017-12-12 14:43:24 -06:00
default :
2018-04-10 04:10:56 -05:00
return fmt . Errorf ( "Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v" , columnValue , columnValue )
2017-12-02 05:40:12 -06:00
}
2017-12-12 14:43:24 -06:00
if metricIndex >= 0 {
if columnValue , ok := values [ metricIndex ] . ( string ) ; ok == true {
2018-03-19 11:14:01 -05:00
metric = columnValue
2017-12-12 14:43:24 -06:00
} else {
return fmt . Errorf ( "Column metric must be of type CHAR, VARCHAR, NCHAR or NVARCHAR. metric column name: %s type: %s but datatype is %T" , columnNames [ metricIndex ] , columnTypes [ metricIndex ] . DatabaseTypeName ( ) , values [ metricIndex ] )
}
2017-12-02 05:40:12 -06:00
}
2017-12-12 14:43:24 -06:00
for i , col := range columnNames {
if i == timeIndex || i == metricIndex {
continue
}
switch columnValue := values [ i ] . ( type ) {
case int64 :
value = null . FloatFrom ( float64 ( columnValue ) )
case float64 :
value = null . FloatFrom ( columnValue )
case nil :
value . Valid = false
default :
return fmt . Errorf ( "Value column must have numeric datatype, column: %s type: %T value: %v" , col , columnValue , columnValue )
}
if metricIndex == - 1 {
metric = col
}
2018-03-15 09:06:54 -05:00
series , exist := pointsBySeries [ metric ]
if exist == false {
series = & tsdb . TimeSeries { Name : metric }
pointsBySeries [ metric ] = series
seriesByQueryOrder . PushBack ( metric )
}
if fillMissing {
var intervalStart float64
if exist == false {
intervalStart = float64 ( tsdbQuery . TimeRange . MustGetFrom ( ) . UnixNano ( ) / 1e6 )
} else {
intervalStart = series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64 + fillInterval
}
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
for i := intervalStart ; i < timestamp ; i += fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
}
}
2018-03-19 07:32:04 -05:00
series . Points = append ( series . Points , tsdb . TimePoint { value , null . FloatFrom ( timestamp ) } )
e . log . Debug ( "Rows" , "metric" , metric , "time" , timestamp , "value" , value )
2017-12-02 05:40:12 -06:00
}
}
for elem := seriesByQueryOrder . Front ( ) ; elem != nil ; elem = elem . Next ( ) {
key := elem . Value . ( string )
result . Series = append ( result . Series , pointsBySeries [ key ] )
2018-03-15 09:06:54 -05:00
if fillMissing {
series := pointsBySeries [ key ]
// fill in values from last fetched value till interval end
intervalStart := series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64
intervalEnd := float64 ( tsdbQuery . TimeRange . MustGetTo ( ) . UnixNano ( ) / 1e6 )
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
for i := intervalStart + fillInterval ; i < intervalEnd ; i += fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
}
}
2017-12-02 05:40:12 -06:00
}
result . Meta . Set ( "rowCount" , rowCount )
return nil
}