From 3b03dce3c2bb6f58cb10608d51d9457afa5227f6 Mon Sep 17 00:00:00 2001 From: Leonard Gram Date: Thu, 15 Mar 2018 15:06:54 +0100 Subject: [PATCH] mssql: timeGroup fill support added. --- pkg/tsdb/mssql/mssql.go | 52 +++++++++++++++++++++++++ pkg/tsdb/mssql/mssql_test.go | 74 ++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) diff --git a/pkg/tsdb/mssql/mssql.go b/pkg/tsdb/mssql/mssql.go index 51dc5335707..f11923b499a 100644 --- a/pkg/tsdb/mssql/mssql.go +++ b/pkg/tsdb/mssql/mssql.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" + "math" ) type MssqlQueryEndpoint struct { @@ -176,6 +177,18 @@ func (e MssqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. return fmt.Errorf("Found no column named time") } + fillMissing := query.Model.Get("fill").MustBool(false) + var fillInterval float64 + fillValue := null.Float{} + if fillMissing { + fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 + if query.Model.Get("fillNull").MustBool(false) == false { + fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() + fillValue.Valid = true + } + + } + for rows.Next() { var timestamp float64 var value null.Float @@ -237,6 +250,30 @@ func (e MssqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. metric = metricColVal } + series, exist := pointsBySeries[metric] + if exist == false { + series = &tsdb.TimeSeries{Name: metric} + pointsBySeries[metric] = series + seriesByQueryOrder.PushBack(metric) + } + + if fillMissing { + var intervalStart float64 + if exist == false { + intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) + } else { + intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval + } + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + + for i := intervalStart; i < timestamp; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } + e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value) rowCount++ } @@ -245,12 +282,27 @@ func (e MssqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { key := elem.Value.(string) result.Series = append(result.Series, pointsBySeries[key]) + + if fillMissing { + series := pointsBySeries[key] + // fill in values from last fetched value till interval end + intervalStart := series.Points[len(series.Points)-1][1].Float64 + intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) + + // align interval start + intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval + for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { + series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) + rowCount++ + } + } } result.Meta.Set("rowCount", rowCount) return nil } +// TODO: look at this, specific to the MS SQL datasource. REMOVE? func (e MssqlQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) { if series, exist := pointsBySeries[metric]; exist { series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)}) diff --git a/pkg/tsdb/mssql/mssql_test.go b/pkg/tsdb/mssql/mssql_test.go index f23534eedca..4c7b3110a04 100644 --- a/pkg/tsdb/mssql/mssql_test.go +++ b/pkg/tsdb/mssql/mssql_test.go @@ -259,6 +259,80 @@ func TestMSSQL(t *testing.T) { So(actualValueLast, ShouldEqual, 20) So(actualTimeLast, ShouldEqual, fromStart.Add(25*time.Minute)) }) + + Convey("When doing a metric query using timeGroup with NULL fill enabled", func() { + query := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + Model: simplejson.NewFromAny(map[string]interface{}{ + "rawSql": "SELECT $__timeGroup(time, '5m', NULL) AS time, measurement as metric, avg(value) as value FROM metric GROUP BY $__timeGroup(time, '5m'), measurement ORDER BY 1", + "format": "time_series", + }), + RefId: "A", + }, + }, + TimeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), + }, + } + + resp, err := endpoint.Query(nil, nil, query) + queryResult := resp.Results["A"] + So(err, ShouldBeNil) + So(queryResult.Error, ShouldBeNil) + + points := queryResult.Series[0].Points + + So(len(points), ShouldEqual, 7) + actualValueFirst := points[0][0].Float64 + actualTimeFirst := time.Unix(int64(points[0][1].Float64)/1000, 0) + So(actualValueFirst, ShouldEqual, 15) + So(actualTimeFirst, ShouldEqual, fromStart) + + actualNullPoint := points[3][0] + actualNullTime := time.Unix(int64(points[3][1].Float64)/1000, 0) + So(actualNullPoint.Valid, ShouldBeFalse) + So(actualNullTime, ShouldEqual, fromStart.Add(15*time.Minute)) + + actualValueLast := points[5][0].Float64 + actualTimeLast := time.Unix(int64(points[5][1].Float64)/1000, 0) + So(actualValueLast, ShouldEqual, 20) + So(actualTimeLast, ShouldEqual, fromStart.Add(25*time.Minute)) + + actualLastNullPoint := points[6][0] + actualLastNullTime := time.Unix(int64(points[6][1].Float64)/1000, 0) + So(actualLastNullPoint.Valid, ShouldBeFalse) + So(actualLastNullTime, ShouldEqual, fromStart.Add(30*time.Minute)) + + }) + + Convey("When doing a metric query using timeGroup with float fill enabled", func() { + query := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + Model: simplejson.NewFromAny(map[string]interface{}{ + "rawSql": "SELECT $__timeGroup(time, '5m', 1.5) AS time, measurement as metric, avg(value) as value FROM metric GROUP BY $__timeGroup(time, '5m'), measurement ORDER BY 1", + "format": "time_series", + }), + RefId: "A", + }, + }, + TimeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), + }, + } + + resp, err := endpoint.Query(nil, nil, query) + queryResult := resp.Results["A"] + So(err, ShouldBeNil) + So(queryResult.Error, ShouldBeNil) + + points := queryResult.Series[0].Points + + So(points[6][0].Float64, ShouldEqual, 1.5) + }) }) }) }