grafana/pkg/tsdb/sqleng/sql_engine.go
Marcus Efraimsson fbdaf56a84
Postgres/MySQL/MSSQL: Fix region annotations not displayed correctly (#38936)
Fix region annotations not displayed correctly when returning timeend column 
as epoch timestamp and by that making sure that the returned data frame field 
named timeend is treated as time type.

Fixes #38533
2021-09-08 01:54:48 +02:00

1031 lines
30 KiB
Go

package sqleng
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/grafana/grafana/pkg/util/errutil"
"xorm.io/core"
"xorm.io/xorm"
)
// MetaKeyExecutedQueryString is the key where the executed query should get stored
const MetaKeyExecutedQueryString = "executedQueryString"
var ErrConnectionFailed = errors.New("failed to connect to server - please inspect Grafana server log for details")
// SQLMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
// timeRange to be able to generate queries that use from and to.
type SQLMacroEngine interface {
Interpolate(query *backend.DataQuery, timeRange backend.TimeRange, sql string) (string, error)
}
// SqlQueryResultTransformer transforms a query result row to RowValues with proper types.
type SqlQueryResultTransformer interface {
// TransformQueryError transforms a query error.
TransformQueryError(err error) error
GetConverterList() []sqlutil.StringConverter
}
type engineCacheType struct {
cache map[int64]*xorm.Engine
updates map[int64]time.Time
sync.Mutex
}
var engineCache = engineCacheType{
cache: make(map[int64]*xorm.Engine),
updates: make(map[int64]time.Time),
}
var sqlIntervalCalculator = intervalv2.NewCalculator()
// NewXormEngine is an xorm.Engine factory, that can be stubbed by tests.
//nolint:gocritic
var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) {
return xorm.NewEngine(driverName, connectionString)
}
type JsonData struct {
MaxOpenConns int `json:"maxOpenConns"`
MaxIdleConns int `json:"maxIdleConns"`
ConnMaxLifetime int `json:"connMaxLifetime"`
Timescaledb bool `json:"timescaledb"`
Mode string `json:"sslmode"`
ConfigurationMethod string `json:"tlsConfigurationMethod"`
RootCertFile string `json:"sslRootCertFile"`
CertFile string `json:"sslCertFile"`
CertKeyFile string `json:"sslKeyFile"`
Timezone string `json:"timezone"`
Encrypt string `json:"encrypt"`
TimeInterval string `json:"timeInterval"`
}
type DataSourceInfo struct {
JsonData JsonData
URL string
User string
Database string
ID int64
Updated time.Time
UID string
DecryptedSecureJSONData map[string]string
}
type DataPluginConfiguration struct {
DriverName string
DSInfo DataSourceInfo
ConnectionString string
TimeColumnNames []string
MetricColumnTypes []string
}
type DataSourceHandler struct {
macroEngine SQLMacroEngine
queryResultTransformer SqlQueryResultTransformer
engine *xorm.Engine
timeColumnNames []string
metricColumnTypes []string
log log.Logger
dsInfo DataSourceInfo
}
type QueryJson struct {
RawSql string `json:"rawSql"`
Fill bool `json:"fill"`
FillInterval float64 `json:"fillInterval"`
FillMode string `json:"fillMode"`
FillValue float64 `json:"fillValue"`
Format string `json:"format"`
}
func (e *DataSourceHandler) transformQueryError(err error) error {
// OpError is the error type usually returned by functions in the net
// package. It describes the operation, network type, and address of
// an error. We log this error rather than return it to the client
// for security purposes.
var opErr *net.OpError
if errors.As(err, &opErr) {
e.log.Error("query error", "err", err)
return ErrConnectionFailed
}
return e.queryResultTransformer.TransformQueryError(err)
}
func NewQueryDataHandler(config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
macroEngine SQLMacroEngine, log log.Logger) (*DataSourceHandler, error) {
queryDataHandler := DataSourceHandler{
queryResultTransformer: queryResultTransformer,
macroEngine: macroEngine,
timeColumnNames: []string{"time"},
log: log,
dsInfo: config.DSInfo,
}
if len(config.TimeColumnNames) > 0 {
queryDataHandler.timeColumnNames = config.TimeColumnNames
}
if len(config.MetricColumnTypes) > 0 {
queryDataHandler.metricColumnTypes = config.MetricColumnTypes
}
engineCache.Lock()
defer engineCache.Unlock()
if engine, present := engineCache.cache[config.DSInfo.ID]; present {
if updateTime := engineCache.updates[config.DSInfo.ID]; updateTime.Before(config.DSInfo.Updated) {
queryDataHandler.engine = engine
return &queryDataHandler, nil
}
}
engine, err := NewXormEngine(config.DriverName, config.ConnectionString)
if err != nil {
return nil, err
}
engine.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
engine.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
engine.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
engineCache.updates[config.DSInfo.ID] = config.DSInfo.Updated
engineCache.cache[config.DSInfo.ID] = engine
queryDataHandler.engine = engine
return &queryDataHandler, nil
}
const rowLimit = 1000000
type DBDataResponse struct {
dataResponse backend.DataResponse
refID string
}
func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
ch := make(chan DBDataResponse, len(req.Queries))
var wg sync.WaitGroup
// Execute each query in a goroutine and wait for them to finish afterwards
for _, query := range req.Queries {
queryjson := QueryJson{
Fill: false,
Format: "time_series",
}
err := json.Unmarshal(query.JSON, &queryjson)
if err != nil {
return nil, fmt.Errorf("error unmarshal query json: %w", err)
}
if queryjson.RawSql == "" {
continue
}
wg.Add(1)
go e.executeQuery(query, &wg, ctx, ch, queryjson)
}
wg.Wait()
// Read results from channels
close(ch)
result.Responses = make(map[string]backend.DataResponse)
for queryResult := range ch {
result.Responses[queryResult.refID] = queryResult.dataResponse
}
return result, nil
}
func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitGroup, queryContext context.Context,
ch chan DBDataResponse, queryJson QueryJson) {
defer wg.Done()
queryResult := DBDataResponse{
dataResponse: backend.DataResponse{},
refID: query.RefID,
}
defer func() {
if r := recover(); r != nil {
e.log.Error("executeQuery panic", "error", r, "stack", log.Stack(1))
if theErr, ok := r.(error); ok {
queryResult.dataResponse.Error = theErr
} else if theErrString, ok := r.(string); ok {
queryResult.dataResponse.Error = fmt.Errorf(theErrString)
} else {
queryResult.dataResponse.Error = fmt.Errorf("unexpected error, see the server log for details")
}
ch <- queryResult
}
}()
if queryJson.RawSql == "" {
panic("Query model property rawSql should not be empty at this point")
}
timeRange := query.TimeRange
errAppendDebug := func(frameErr string, err error, query string) {
var emptyFrame data.Frame
emptyFrame.SetMeta(&data.FrameMeta{
ExecutedQueryString: query,
})
queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err)
queryResult.dataResponse.Frames = data.Frames{&emptyFrame}
ch <- queryResult
}
// global substitutions
interpolatedQuery, err := Interpolate(query, timeRange, e.dsInfo.JsonData.TimeInterval, queryJson.RawSql)
if err != nil {
errAppendDebug("interpolation failed", e.transformQueryError(err), interpolatedQuery)
return
}
// data source specific substitutions
interpolatedQuery, err = e.macroEngine.Interpolate(&query, timeRange, interpolatedQuery)
if err != nil {
errAppendDebug("interpolation failed", e.transformQueryError(err), interpolatedQuery)
return
}
session := e.engine.NewSession()
defer session.Close()
db := session.DB()
rows, err := db.Query(interpolatedQuery)
if err != nil {
errAppendDebug("db query error", e.transformQueryError(err), interpolatedQuery)
return
}
defer func() {
if err := rows.Close(); err != nil {
e.log.Warn("Failed to close rows", "err", err)
}
}()
qm, err := e.newProcessCfg(query, queryContext, rows, interpolatedQuery)
if err != nil {
errAppendDebug("failed to get configurations", err, interpolatedQuery)
return
}
// Convert row.Rows to dataframe
stringConverters := e.queryResultTransformer.GetConverterList()
frame, err := sqlutil.FrameFromRows(rows.Rows, rowLimit, sqlutil.ToConverters(stringConverters...)...)
if err != nil {
errAppendDebug("convert frame from rows error", err, interpolatedQuery)
return
}
frame.SetMeta(&data.FrameMeta{
ExecutedQueryString: interpolatedQuery,
})
// If no rows were returned, no point checking anything else.
if frame.Rows() == 0 {
queryResult.dataResponse.Frames = data.Frames{frame}
ch <- queryResult
return
}
if err := convertSQLTimeColumnsToEpochMS(frame, qm); err != nil {
errAppendDebug("converting time columns failed", err, interpolatedQuery)
return
}
if qm.Format == dataQueryFormatSeries {
// time series has to have time column
if qm.timeIndex == -1 {
errAppendDebug("db has no time column", errors.New("no time column found"), interpolatedQuery)
return
}
// Make sure to name the time field 'Time' to be backward compatible with Grafana pre-v8.
frame.Fields[qm.timeIndex].Name = data.TimeSeriesTimeFieldName
for i := range qm.columnNames {
if i == qm.timeIndex || i == qm.metricIndex {
continue
}
if t := frame.Fields[i].Type(); t == data.FieldTypeString || t == data.FieldTypeNullableString {
continue
}
var err error
if frame, err = convertSQLValueColumnToFloat(frame, i); err != nil {
errAppendDebug("convert value to float failed", err, interpolatedQuery)
return
}
}
tsSchema := frame.TimeSeriesSchema()
if tsSchema.Type == data.TimeSeriesTypeLong {
var err error
originalData := frame
frame, err = data.LongToWide(frame, qm.FillMissing)
if err != nil {
errAppendDebug("failed to convert long to wide series when converting from dataframe", err, interpolatedQuery)
return
}
// Before 8x, a special metric column was used to name time series. The LongToWide transforms that into a metric label on the value field.
// But that makes series name have both the value column name AND the metric name. So here we are removing the metric label here and moving it to the
// field name to get the same naming for the series as pre v8
if len(originalData.Fields) == 3 {
for _, field := range frame.Fields {
if len(field.Labels) == 1 { // 7x only supported one label
name, ok := field.Labels["metric"]
if ok {
field.Name = name
field.Labels = nil
}
}
}
}
}
if qm.FillMissing != nil {
var err error
frame, err = resample(frame, *qm)
if err != nil {
e.log.Error("Failed to resample dataframe", "err", err)
frame.AppendNotices(data.Notice{Text: "Failed to resample dataframe", Severity: data.NoticeSeverityWarning})
}
}
}
queryResult.dataResponse.Frames = data.Frames{frame}
ch <- queryResult
}
// Interpolate provides global macros/substitutions for all sql datasources.
var Interpolate = func(query backend.DataQuery, timeRange backend.TimeRange, timeInterval string, sql string) (string, error) {
minInterval, err := intervalv2.GetIntervalFrom(timeInterval, query.Interval.String(), query.Interval.Milliseconds(), time.Second*60)
if err != nil {
return "", err
}
interval, err := sqlIntervalCalculator.Calculate(timeRange, minInterval, "min")
if err != nil {
return "", err
}
sql = strings.ReplaceAll(sql, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10))
sql = strings.ReplaceAll(sql, "$__interval", interval.Text)
sql = strings.ReplaceAll(sql, "$__unixEpochFrom()", fmt.Sprintf("%d", timeRange.From.UTC().Unix()))
sql = strings.ReplaceAll(sql, "$__unixEpochTo()", fmt.Sprintf("%d", timeRange.To.UTC().Unix()))
return sql, nil
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func (e *DataSourceHandler) newProcessCfg(query backend.DataQuery, queryContext context.Context,
rows *core.Rows, interpolatedQuery string) (*dataQueryModel, error) {
columnNames, err := rows.Columns()
if err != nil {
return nil, err
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
qm := &dataQueryModel{
columnTypes: columnTypes,
columnNames: columnNames,
rows: rows,
timeIndex: -1,
timeEndIndex: -1,
metricIndex: -1,
metricPrefix: false,
queryContext: queryContext,
}
queryJson := QueryJson{}
err = json.Unmarshal(query.JSON, &queryJson)
if err != nil {
return nil, err
}
if queryJson.Fill {
qm.FillMissing = &data.FillMissing{}
qm.Interval = time.Duration(queryJson.FillInterval * float64(time.Second))
switch strings.ToLower(queryJson.FillMode) {
case "null":
qm.FillMissing.Mode = data.FillModeNull
case "previous":
qm.FillMissing.Mode = data.FillModePrevious
case "value":
qm.FillMissing.Mode = data.FillModeValue
qm.FillMissing.Value = queryJson.FillValue
default:
}
}
//nolint: staticcheck // plugins.DataPlugin deprecated
qm.TimeRange.From = query.TimeRange.From.UTC()
qm.TimeRange.To = query.TimeRange.To.UTC()
switch queryJson.Format {
case "time_series":
qm.Format = dataQueryFormatSeries
case "table":
qm.Format = dataQueryFormatTable
default:
panic(fmt.Sprintf("Unrecognized query model format: %q", queryJson.Format))
}
for i, col := range qm.columnNames {
for _, tc := range e.timeColumnNames {
if col == tc {
qm.timeIndex = i
break
}
}
if qm.Format == dataQueryFormatTable && col == "timeend" {
qm.timeEndIndex = i
continue
}
switch col {
case "metric":
qm.metricIndex = i
default:
if qm.metricIndex == -1 {
columnType := qm.columnTypes[i].DatabaseTypeName()
for _, mct := range e.metricColumnTypes {
if columnType == mct {
qm.metricIndex = i
continue
}
}
}
}
}
qm.InterpolatedQuery = interpolatedQuery
return qm, nil
}
// dataQueryFormat is the type of query.
type dataQueryFormat string
const (
// dataQueryFormatTable identifies a table query (default).
dataQueryFormatTable dataQueryFormat = "table"
// dataQueryFormatSeries identifies a time series query.
dataQueryFormatSeries dataQueryFormat = "time_series"
)
type dataQueryModel struct {
InterpolatedQuery string // property not set until after Interpolate()
Format dataQueryFormat
TimeRange backend.TimeRange
FillMissing *data.FillMissing // property not set until after Interpolate()
Interval time.Duration
columnNames []string
columnTypes []*sql.ColumnType
timeIndex int
timeEndIndex int
metricIndex int
rows *core.Rows
metricPrefix bool
queryContext context.Context
}
func convertInt64ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(int64))
newField.Append(&value)
}
}
func convertNullableInt64ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int64)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertUInt64ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(uint64))
newField.Append(&value)
}
}
func convertNullableUInt64ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint64)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertInt32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(int32))
newField.Append(&value)
}
}
func convertNullableInt32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int32)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertUInt32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(uint32))
newField.Append(&value)
}
}
func convertNullableUInt32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint32)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertInt16ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(int16))
newField.Append(&value)
}
}
func convertNullableInt16ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int16)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertUInt16ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(uint16))
newField.Append(&value)
}
}
func convertNullableUInt16ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint16)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertInt8ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(int8))
newField.Append(&value)
}
}
func convertNullableInt8ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int8)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertUInt8ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(uint8))
newField.Append(&value)
}
}
func convertNullableUInt8ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint8)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertUnknownToZero(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(0)
newField.Append(&value)
}
}
func convertNullableFloat32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*float32)
if iv == nil {
newField.Append(nil)
} else {
value := float64(*iv)
newField.Append(&value)
}
}
}
func convertFloat32ToFloat64(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := float64(origin.At(i).(float32))
newField.Append(&value)
}
}
func convertInt64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(float64(origin.At(i).(int64))))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableInt64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int64)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(float64(*iv)))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertUInt64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(float64(origin.At(i).(uint64))))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableUInt64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint64)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(float64(*iv)))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertInt32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(float64(origin.At(i).(int32))))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableInt32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*int32)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(float64(*iv)))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertUInt32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(float64(origin.At(i).(uint32))))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableUInt32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*uint32)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(float64(*iv)))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertFloat64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(origin.At(i).(float64)))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableFloat64ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*float64)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(*iv))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertFloat32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
value := time.Unix(0, int64(epochPrecisionToMS(float64(origin.At(i).(float32))))*int64(time.Millisecond))
newField.Append(&value)
}
}
func convertNullableFloat32ToEpochMS(origin *data.Field, newField *data.Field) {
valueLength := origin.Len()
for i := 0; i < valueLength; i++ {
iv := origin.At(i).(*float32)
if iv == nil {
newField.Append(nil)
} else {
value := time.Unix(0, int64(epochPrecisionToMS(float64(*iv)))*int64(time.Millisecond))
newField.Append(&value)
}
}
}
func convertSQLTimeColumnsToEpochMS(frame *data.Frame, qm *dataQueryModel) error {
if qm.timeIndex != -1 {
if err := convertSQLTimeColumnToEpochMS(frame, qm.timeIndex); err != nil {
return errutil.Wrap("failed to convert time column", err)
}
}
if qm.timeEndIndex != -1 {
if err := convertSQLTimeColumnToEpochMS(frame, qm.timeEndIndex); err != nil {
return errutil.Wrap("failed to convert timeend column", err)
}
}
return nil
}
// convertSQLTimeColumnToEpochMS converts column named time to unix timestamp in milliseconds
// to make native datetime types and epoch dates work in annotation and table queries.
func convertSQLTimeColumnToEpochMS(frame *data.Frame, timeIndex int) error {
if timeIndex < 0 || timeIndex >= len(frame.Fields) {
return fmt.Errorf("timeIndex %d is out of range", timeIndex)
}
origin := frame.Fields[timeIndex]
valueType := origin.Type()
if valueType == data.FieldTypeTime || valueType == data.FieldTypeNullableTime {
return nil
}
newField := data.NewFieldFromFieldType(data.FieldTypeNullableTime, 0)
newField.Name = origin.Name
newField.Labels = origin.Labels
switch valueType {
case data.FieldTypeInt64:
convertInt64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableInt64:
convertNullableInt64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeUint64:
convertUInt64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableUint64:
convertNullableUInt64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeInt32:
convertInt32ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableInt32:
convertNullableInt32ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeUint32:
convertUInt32ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableUint32:
convertNullableUInt32ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeFloat64:
convertFloat64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableFloat64:
convertNullableFloat64ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeFloat32:
convertFloat32ToEpochMS(frame.Fields[timeIndex], newField)
case data.FieldTypeNullableFloat32:
convertNullableFloat32ToEpochMS(frame.Fields[timeIndex], newField)
default:
return fmt.Errorf("column type %q is not convertible to time.Time", valueType)
}
frame.Fields[timeIndex] = newField
return nil
}
// convertSQLValueColumnToFloat converts timeseries value column to float.
//nolint: gocyclo
func convertSQLValueColumnToFloat(frame *data.Frame, Index int) (*data.Frame, error) {
if Index < 0 || Index >= len(frame.Fields) {
return frame, fmt.Errorf("metricIndex %d is out of range", Index)
}
origin := frame.Fields[Index]
valueType := origin.Type()
if valueType == data.FieldTypeFloat64 || valueType == data.FieldTypeNullableFloat64 {
return frame, nil
}
newField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
newField.Name = origin.Name
newField.Labels = origin.Labels
switch valueType {
case data.FieldTypeInt64:
convertInt64ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableInt64:
convertNullableInt64ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeUint64:
convertUInt64ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableUint64:
convertNullableUInt64ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeInt32:
convertInt32ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableInt32:
convertNullableInt32ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeUint32:
convertUInt32ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableUint32:
convertNullableUInt32ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeInt16:
convertInt16ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableInt16:
convertNullableInt16ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeUint16:
convertUInt16ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableUint16:
convertNullableUInt16ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeInt8:
convertInt8ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableInt8:
convertNullableInt8ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeUint8:
convertUInt8ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableUint8:
convertNullableUInt8ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeFloat32:
convertFloat32ToFloat64(frame.Fields[Index], newField)
case data.FieldTypeNullableFloat32:
convertNullableFloat32ToFloat64(frame.Fields[Index], newField)
default:
convertUnknownToZero(frame.Fields[Index], newField)
frame.Fields[Index] = newField
return frame, fmt.Errorf("metricIndex %d type %s can't be converted to float", Index, valueType)
}
frame.Fields[Index] = newField
return frame, nil
}
func SetupFillmode(query *backend.DataQuery, interval time.Duration, fillmode string) error {
rawQueryProp := make(map[string]interface{})
queryBytes, err := query.JSON.MarshalJSON()
if err != nil {
return err
}
err = json.Unmarshal(queryBytes, &rawQueryProp)
if err != nil {
return err
}
rawQueryProp["fill"] = true
rawQueryProp["fillInterval"] = interval.Seconds()
switch fillmode {
case "NULL":
rawQueryProp["fillMode"] = "null"
case "previous":
rawQueryProp["fillMode"] = "previous"
default:
rawQueryProp["fillMode"] = "value"
floatVal, err := strconv.ParseFloat(fillmode, 64)
if err != nil {
return fmt.Errorf("error parsing fill value %v", fillmode)
}
rawQueryProp["fillValue"] = floatVal
}
query.JSON, err = json.Marshal(rawQueryProp)
if err != nil {
return err
}
return nil
}
type SQLMacroEngineBase struct{}
func NewSQLMacroEngineBase() *SQLMacroEngineBase {
return &SQLMacroEngineBase{}
}
func (m *SQLMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) {
groups := []string{}
for i := 0; i < len(v); i += 2 {
groups = append(groups, str[v[i]:v[i+1]])
}
result += str[lastIndex:v[0]] + repl(groups)
lastIndex = v[1]
}
return result + str[lastIndex:]
}
// epochPrecisionToMS converts epoch precision to millisecond, if needed.
// Only seconds to milliseconds supported right now
func epochPrecisionToMS(value float64) float64 {
s := strconv.FormatFloat(value, 'e', -1, 64)
if strings.HasSuffix(s, "e+09") {
return value * float64(1e3)
}
if strings.HasSuffix(s, "e+18") {
return value / float64(time.Millisecond)
}
return value
}