Merge remote-tracking branch 'upstream/master' into macroengine-interpolate

This commit is contained in:
Sven Klemm
2018-03-01 18:01:54 +01:00
3010 changed files with 383925 additions and 190049 deletions

View File

@@ -6,8 +6,8 @@ import (
"database/sql"
"fmt"
"math"
"reflect"
"strconv"
"time"
"github.com/go-sql-driver/mysql"
@@ -74,24 +74,36 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
table.Columns[i].Text = name
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
rowLimit := 1000000
rowCount := 0
timeIndex := -1
// check if there is a column named time
for i, col := range columnNames {
switch col {
case "time_sec":
timeIndex = i
}
}
for ; rows.Next(); rowCount++ {
if rowCount > rowLimit {
return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
}
values, err := e.getTypedRowData(columnTypes, rows)
values, err := e.getTypedRowData(rows)
if err != nil {
return err
}
// for annotations, convert to epoch
if timeIndex != -1 {
switch value := values[timeIndex].(type) {
case time.Time:
values[timeIndex] = float64(value.UnixNano() / 1e9)
}
}
table.Rows = append(table.Rows, values)
}
@@ -100,60 +112,20 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
return nil
}
func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
types, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
values := make([]interface{}, len(types))
for i, stype := range types {
e.log.Debug("type", "type", stype)
switch stype.DatabaseTypeName() {
case mysql.FieldTypeNameTiny:
values[i] = new(int8)
case mysql.FieldTypeNameInt24:
values[i] = new(int32)
case mysql.FieldTypeNameShort:
values[i] = new(int16)
case mysql.FieldTypeNameVarString:
values[i] = new(string)
case mysql.FieldTypeNameVarChar:
values[i] = new(string)
case mysql.FieldTypeNameLong:
values[i] = new(int)
case mysql.FieldTypeNameLongLong:
values[i] = new(int64)
case mysql.FieldTypeNameDouble:
values[i] = new(float64)
case mysql.FieldTypeNameDecimal:
values[i] = new(float32)
case mysql.FieldTypeNameNewDecimal:
values[i] = new(float64)
case mysql.FieldTypeNameFloat:
values[i] = new(float64)
case mysql.FieldTypeNameTimestamp:
values[i] = new(time.Time)
case mysql.FieldTypeNameDateTime:
values[i] = new(time.Time)
case mysql.FieldTypeNameTime:
values[i] = new(string)
case mysql.FieldTypeNameYear:
values[i] = new(int16)
case mysql.FieldTypeNameNULL:
values[i] = nil
case mysql.FieldTypeNameBit:
for i := range values {
scanType := types[i].ScanType()
values[i] = reflect.New(scanType).Interface()
if types[i].DatabaseTypeName() == "BIT" {
values[i] = new([]byte)
case mysql.FieldTypeNameBLOB:
values[i] = new(string)
case mysql.FieldTypeNameTinyBLOB:
values[i] = new(string)
case mysql.FieldTypeNameMediumBLOB:
values[i] = new(string)
case mysql.FieldTypeNameLongBLOB:
values[i] = new(string)
case mysql.FieldTypeNameString:
values[i] = new(string)
case mysql.FieldTypeNameDate:
values[i] = new(string)
default:
return nil, fmt.Errorf("Database type %s not supported", stype.DatabaseTypeName())
}
}
@@ -161,14 +133,54 @@ func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.
return nil, err
}
for i := 0; i < len(types); i++ {
typeName := reflect.ValueOf(values[i]).Type().String()
switch typeName {
case "*sql.RawBytes":
values[i] = string(*values[i].(*sql.RawBytes))
case "*mysql.NullTime":
sqlTime := (*values[i].(*mysql.NullTime))
if sqlTime.Valid {
values[i] = sqlTime.Time
} else {
values[i] = nil
}
case "*sql.NullInt64":
nullInt64 := (*values[i].(*sql.NullInt64))
if nullInt64.Valid {
values[i] = nullInt64.Int64
} else {
values[i] = nil
}
case "*sql.NullFloat64":
nullFloat64 := (*values[i].(*sql.NullFloat64))
if nullFloat64.Valid {
values[i] = nullFloat64.Float64
} else {
values[i] = nil
}
}
if types[i].DatabaseTypeName() == "DECIMAL" {
f, err := strconv.ParseFloat(values[i].(string), 64)
if err == nil {
values[i] = f
} else {
values[i] = nil
}
}
}
return values, nil
}
func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
pointsBySeries := make(map[string]*tsdb.TimeSeries)
seriesByQueryOrder := list.New()
columnNames, err := rows.Columns()
columnNames, err := rows.Columns()
if err != nil {
return err
}