mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
InfluxDB: SQL Support (#72167)
* Add influxdbSqlSupport feature toggle * Add SQL option to the config page * Add SQL backend * Add metadata support in config page * Implement unified querying * Fix healthcheck query * fsql tests * secure grpc by default * code cleanup * Query handing for sql mode * Implement a placeholder sql editor * Fix query language dropdown * go mod updates * make lint-go * more make lint-go * remove unused runQuery * switch statements with default case * linting again
This commit is contained in:
parent
3172715a02
commit
77e7ae2a1b
@ -4010,9 +4010,7 @@ exports[`better eslint`] = {
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "1"]
|
||||
],
|
||||
"public/app/plugins/datasource/influxdb/components/editor/config/ConfigEditor.tsx:5381": [
|
||||
[0, 0, 0, "Do not use any type assertions.", "0"],
|
||||
[0, 0, 0, "Do not use any type assertions.", "1"],
|
||||
[0, 0, 0, "Do not use any type assertions.", "2"]
|
||||
[0, 0, 0, "Do not use any type assertions.", "0"]
|
||||
],
|
||||
"public/app/plugins/datasource/influxdb/datasource.ts:5381": [
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
|
||||
|
@ -128,6 +128,7 @@ Experimental features might be changed or removed without prior notice.
|
||||
| `awsAsyncQueryCaching` | Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled |
|
||||
| `permissionsFilterRemoveSubquery` | Alternative permission filter implementation that does not use subqueries for fetching the dashboard folder |
|
||||
| `prometheusConfigOverhaulAuth` | Update the Prometheus configuration page with the new auth component |
|
||||
| `influxdbSqlSupport` | Enable InfluxDB SQL query language support with new querying UI |
|
||||
|
||||
## Development feature toggles
|
||||
|
||||
|
20
go.mod
20
go.mod
@ -295,6 +295,8 @@ require (
|
||||
github.com/NYTimes/gziphandler v1.1.1 // indirect
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
|
||||
github.com/apache/arrow/go/v12 v12.0.1 // indirect
|
||||
github.com/apache/thrift v0.18.1 // indirect
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
github.com/armon/go-metrics v0.4.1 // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
@ -327,6 +329,7 @@ require (
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.12.0 // indirect
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gogo/googleapis v1.4.1 // indirect
|
||||
github.com/gogo/status v1.1.1 // indirect
|
||||
github.com/google/cel-go v0.12.6 // indirect
|
||||
@ -344,12 +347,17 @@ require (
|
||||
github.com/hashicorp/memberlist v0.5.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/invopop/yaml v0.1.0 // indirect
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
|
||||
github.com/klauspost/asmfmt v1.3.2 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/magiconair/properties v1.8.6 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-ieproxy v0.0.3 // indirect
|
||||
github.com/mattn/goveralls v0.0.6 // indirect
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
|
||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
@ -366,6 +374,7 @@ require (
|
||||
github.com/pborman/uuid v1.2.0 // indirect
|
||||
github.com/pelletier/go-toml v1.9.5 // indirect
|
||||
github.com/perimeterx/marshmallow v1.1.4 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rivo/uniseg v0.3.4 // indirect
|
||||
github.com/rogpeppe/go-internal v1.10.0 // indirect
|
||||
github.com/rueian/rueidis v0.0.100 // indirect
|
||||
@ -385,6 +394,7 @@ require (
|
||||
github.com/weaveworks/promrus v1.2.0 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
||||
github.com/yuin/gopher-lua v1.1.0 // indirect
|
||||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.7 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.7 // indirect
|
||||
@ -404,6 +414,16 @@ require (
|
||||
k8s.io/kms v0.27.1 // indirect
|
||||
k8s.io/kube-aggregator v0.27.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
|
||||
lukechampine.com/uint128 v1.2.0 // indirect
|
||||
modernc.org/cc/v3 v3.40.0 // indirect
|
||||
modernc.org/ccgo/v3 v3.16.13 // indirect
|
||||
modernc.org/libc v1.22.2 // indirect
|
||||
modernc.org/mathutil v1.5.0 // indirect
|
||||
modernc.org/memory v1.5.0 // indirect
|
||||
modernc.org/opt v0.1.3 // indirect
|
||||
modernc.org/sqlite v1.18.2 // indirect
|
||||
modernc.org/strutil v1.1.3 // indirect
|
||||
modernc.org/token v1.1.0 // indirect
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
|
33
go.sum
33
go.sum
@ -696,9 +696,13 @@ github.com/apache/arrow/go/arrow v0.0.0-20210223225224-5bea62493d91/go.mod h1:c9
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ=
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
|
||||
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=
|
||||
github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/DiJbg=
|
||||
github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
|
||||
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
|
||||
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
|
||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
@ -1544,6 +1548,7 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
|
||||
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
|
||||
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
|
||||
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
|
||||
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
|
||||
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
|
||||
github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0=
|
||||
@ -2082,6 +2087,7 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb
|
||||
github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE=
|
||||
github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro=
|
||||
github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
|
||||
github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351 h1:DowS9hvgyYSX4TO5NpyC606/Z4SxnNYbT+WX27or6Ck=
|
||||
@ -2090,6 +2096,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
|
||||
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
|
||||
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
|
||||
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
@ -2102,8 +2109,11 @@ github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
|
||||
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
|
||||
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
|
||||
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
|
||||
github.com/knadh/koanf v0.14.1-0.20201201075439-e0853799f9ec/go.mod h1:H5mEFsTeWizwFXHKtsITL5ipsLTuAMQoGuQpp+1JL9U=
|
||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
|
||||
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
|
||||
@ -2255,7 +2265,9 @@ github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJys
|
||||
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
|
||||
github.com/miekg/dns v1.1.51 h1:0+Xg7vObnhrz/4ZCZcZh7zPXlmU0aveS2HDBd0m0qSo=
|
||||
github.com/miekg/dns v1.1.51/go.mod h1:2Z9d3CP1LQWihRZUf29mQ19yDThaI4DAYzte2CaQW5c=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
@ -2574,6 +2586,8 @@ github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSx
|
||||
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rhnvrm/simples3 v0.5.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.3.4 h1:3Z3Eu6FGHZWSfNKJTOUiPatWwfc7DzJRU04jFUqJODw=
|
||||
@ -2884,6 +2898,7 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
|
||||
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
|
||||
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
|
||||
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
|
||||
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
|
||||
github.com/zenazn/goji v1.0.1/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
|
||||
@ -3469,6 +3484,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
@ -4054,17 +4070,23 @@ k8s.io/utils v0.0.0-20230308161112-d77c459e9343/go.mod h1:OLgZIPagt7ERELqWJFomSt
|
||||
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk=
|
||||
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
|
||||
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
|
||||
lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI=
|
||||
lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
|
||||
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
|
||||
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
|
||||
modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI=
|
||||
modernc.org/cc/v3 v3.36.2/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI=
|
||||
modernc.org/cc/v3 v3.36.3/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI=
|
||||
modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw=
|
||||
modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0=
|
||||
modernc.org/ccgo/v3 v3.0.0-20220428102840-41399a37e894/go.mod h1:eI31LL8EwEBKPpNpA4bU1/i+sKOwOrQy8D87zWUcRZc=
|
||||
modernc.org/ccgo/v3 v3.0.0-20220430103911-bc99d88307be/go.mod h1:bwdAnOoaIt8Ax9YdWGjxWsdkPcZyRPHqrOvJxaKAKGw=
|
||||
modernc.org/ccgo/v3 v3.16.4/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ=
|
||||
modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWsQ=
|
||||
modernc.org/ccgo/v3 v3.16.8/go.mod h1:zNjwkizS+fIFDrDjIAgBSCLkWbJuHF+ar3QRn+Z9aws=
|
||||
modernc.org/ccgo/v3 v3.16.9/go.mod h1:zNMzC9A9xeNUepy6KuZBbugn3c0Mc9TeiJO4lgvkJDo=
|
||||
modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw=
|
||||
modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY=
|
||||
modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
|
||||
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
|
||||
modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM=
|
||||
@ -4075,21 +4097,32 @@ modernc.org/libc v1.16.17/go.mod h1:hYIV5VZczAmGZAnG15Vdngn5HSF5cSkbvfz2B7GRuVU=
|
||||
modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA=
|
||||
modernc.org/libc v1.17.0/go.mod h1:XsgLldpP4aWlPlsjqKRdHPqCxCjISdHfM/yeWC5GyW0=
|
||||
modernc.org/libc v1.17.1/go.mod h1:FZ23b+8LjxZs7XtFMbSzL/EhPxNbfZbErxEHc7cbD9s=
|
||||
modernc.org/libc v1.22.2 h1:4U7v51GyhlWqQmwCHj28Rdq2Yzwk55ovjFrdPjs8Hb0=
|
||||
modernc.org/libc v1.22.2/go.mod h1:uvQavJ1pZ0hIoC/jfqNoMLURIMhKzINIWypNM17puug=
|
||||
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
|
||||
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw=
|
||||
modernc.org/memory v1.2.0/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw=
|
||||
modernc.org/memory v1.2.1/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
|
||||
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
|
||||
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
|
||||
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
|
||||
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
|
||||
modernc.org/sqlite v1.18.2 h1:S2uFiaNPd/vTAP/4EmyY8Qe2Quzu26A2L1e25xRNTio=
|
||||
modernc.org/sqlite v1.18.2/go.mod h1:kvrTLEWgxUcHa2GfHBQtanR1H9ht3hTJNtKpzH9k1u0=
|
||||
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
|
||||
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
|
||||
modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
|
||||
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
|
||||
modernc.org/tcl v1.13.1/go.mod h1:XOLfOwzhkljL4itZkK6T72ckMgvj0BDsnKNdZVUOecw=
|
||||
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
|
||||
modernc.org/z v1.5.1/go.mod h1:eWFB510QWW5Th9YGZT81s+LwvaAs3Q2yr4sP0rmLkv8=
|
||||
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
|
||||
|
@ -117,4 +117,5 @@ export interface FeatureToggles {
|
||||
permissionsFilterRemoveSubquery?: boolean;
|
||||
prometheusConfigOverhaulAuth?: boolean;
|
||||
configurableSchedulerTick?: boolean;
|
||||
influxdbSqlSupport?: boolean;
|
||||
}
|
||||
|
@ -683,5 +683,13 @@ var (
|
||||
RequiresRestart: true,
|
||||
HideFromDocs: true,
|
||||
},
|
||||
{
|
||||
Name: "influxdbSqlSupport",
|
||||
Description: "Enable InfluxDB SQL query language support with new querying UI",
|
||||
Stage: FeatureStageExperimental,
|
||||
FrontendOnly: false,
|
||||
Owner: grafanaObservabilityMetricsSquad,
|
||||
RequiresRestart: false,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
@ -98,3 +98,4 @@ azureMonitorDataplane,GA,@grafana/partner-datasources,false,false,false,false
|
||||
permissionsFilterRemoveSubquery,experimental,@grafana/backend-platform,false,false,false,false
|
||||
prometheusConfigOverhaulAuth,experimental,@grafana/observability-metrics,false,false,false,false
|
||||
configurableSchedulerTick,experimental,@grafana/alerting-squad,false,false,true,false
|
||||
influxdbSqlSupport,experimental,@grafana/observability-metrics,false,false,false,false
|
||||
|
|
@ -402,4 +402,8 @@ const (
|
||||
// FlagConfigurableSchedulerTick
|
||||
// Enable changing the scheduler base interval via configuration option unified_alerting.scheduler_tick_interval
|
||||
FlagConfigurableSchedulerTick = "configurableSchedulerTick"
|
||||
|
||||
// FlagInfluxdbSqlSupport
|
||||
// Enable InfluxDB SQL query language support with new querying UI
|
||||
FlagInfluxdbSqlSupport = "influxdbSqlSupport"
|
||||
)
|
||||
|
277
pkg/tsdb/influxdb/fsql/arrow.go
Normal file
277
pkg/tsdb/influxdb/fsql/arrow.go
Normal file
@ -0,0 +1,277 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/scalar"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// TODO: Make this configurable. This is an arbitrary value right
|
||||
// now. Grafana used to have a 1M row rowLimit established in open-source. I'll
|
||||
// let users hit that for now until we decide how to proceed.
|
||||
const rowLimit = 1_000_000
|
||||
|
||||
type recordReader interface {
|
||||
Next() bool
|
||||
Schema() *arrow.Schema
|
||||
Record() arrow.Record
|
||||
Err() error
|
||||
}
|
||||
|
||||
// newQueryDataResponse builds a [backend.DataResponse] from a stream of
|
||||
// [arrow.Record]s.
|
||||
//
|
||||
// The backend.DataResponse contains a single [data.Frame].
|
||||
func newQueryDataResponse(reader recordReader, query sqlutil.Query, headers metadata.MD) backend.DataResponse {
|
||||
var resp backend.DataResponse
|
||||
frame, err := frameForRecords(reader)
|
||||
if err != nil {
|
||||
resp.Error = err
|
||||
}
|
||||
if frame.Rows() == 0 {
|
||||
resp.Frames = data.Frames{}
|
||||
return resp
|
||||
}
|
||||
|
||||
frame.Meta.Custom = map[string]any{
|
||||
"headers": headers,
|
||||
}
|
||||
frame.Meta.ExecutedQueryString = query.RawSQL
|
||||
frame.Meta.DataTopic = data.DataTopic(query.RawSQL)
|
||||
|
||||
switch query.Format {
|
||||
case sqlutil.FormatOptionTimeSeries:
|
||||
if _, idx := frame.FieldByName("time"); idx == -1 {
|
||||
resp.Error = fmt.Errorf("no time column found")
|
||||
return resp
|
||||
}
|
||||
|
||||
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeLong {
|
||||
var err error
|
||||
frame, err = data.LongToWide(frame, nil)
|
||||
if err != nil {
|
||||
resp.Error = err
|
||||
return resp
|
||||
}
|
||||
}
|
||||
case sqlutil.FormatOptionTable:
|
||||
// No changes to the output. Send it as is.
|
||||
case sqlutil.FormatOptionLogs:
|
||||
// TODO(brett): We need to find out what this actually is and if its
|
||||
// worth supporting. Pass through as "table" for now.
|
||||
default:
|
||||
resp.Error = fmt.Errorf("unsupported format")
|
||||
}
|
||||
|
||||
resp.Frames = data.Frames{frame}
|
||||
return resp
|
||||
}
|
||||
|
||||
// frameForRecords creates a [data.Frame] from a stream of [arrow.Record]s.
|
||||
func frameForRecords(reader recordReader) (*data.Frame, error) {
|
||||
var (
|
||||
frame = newFrame(reader.Schema())
|
||||
rows int64
|
||||
)
|
||||
for reader.Next() {
|
||||
record := reader.Record()
|
||||
for i, col := range record.Columns() {
|
||||
if err := copyData(frame.Fields[i], col); err != nil {
|
||||
return frame, err
|
||||
}
|
||||
}
|
||||
|
||||
rows += record.NumRows()
|
||||
if rows > rowLimit {
|
||||
frame.AppendNotices(data.Notice{
|
||||
Severity: data.NoticeSeverityWarning,
|
||||
Text: fmt.Sprintf("Results have been limited to %v because the SQL row limit was reached", rowLimit),
|
||||
})
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
|
||||
return frame, err
|
||||
}
|
||||
}
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
// newFrame builds a new Data Frame from an Arrow Schema.
|
||||
func newFrame(schema *arrow.Schema) *data.Frame {
|
||||
fields := schema.Fields()
|
||||
df := &data.Frame{
|
||||
Fields: make([]*data.Field, len(fields)),
|
||||
Meta: &data.FrameMeta{},
|
||||
}
|
||||
for i, f := range fields {
|
||||
df.Fields[i] = newField(f)
|
||||
}
|
||||
return df
|
||||
}
|
||||
|
||||
func newField(f arrow.Field) *data.Field {
|
||||
switch f.Type.ID() {
|
||||
case arrow.STRING:
|
||||
return newDataField[string](f)
|
||||
case arrow.FLOAT32:
|
||||
return newDataField[float32](f)
|
||||
case arrow.FLOAT64:
|
||||
return newDataField[float64](f)
|
||||
case arrow.UINT8:
|
||||
return newDataField[uint8](f)
|
||||
case arrow.UINT16:
|
||||
return newDataField[uint16](f)
|
||||
case arrow.UINT32:
|
||||
return newDataField[uint32](f)
|
||||
case arrow.UINT64:
|
||||
return newDataField[uint64](f)
|
||||
case arrow.INT8:
|
||||
return newDataField[int8](f)
|
||||
case arrow.INT16:
|
||||
return newDataField[int16](f)
|
||||
case arrow.INT32:
|
||||
return newDataField[int32](f)
|
||||
case arrow.INT64:
|
||||
return newDataField[int64](f)
|
||||
case arrow.BOOL:
|
||||
return newDataField[bool](f)
|
||||
case arrow.TIMESTAMP:
|
||||
return newDataField[time.Time](f)
|
||||
case arrow.DURATION:
|
||||
return newDataField[int64](f)
|
||||
default:
|
||||
return newDataField[json.RawMessage](f)
|
||||
}
|
||||
}
|
||||
|
||||
func newDataField[T any](f arrow.Field) *data.Field {
|
||||
if f.Nullable {
|
||||
var s []*T
|
||||
return data.NewField(f.Name, nil, s)
|
||||
}
|
||||
var s []T
|
||||
return data.NewField(f.Name, nil, s)
|
||||
}
|
||||
|
||||
// copyData copies the contents of an Arrow column into a Data Frame field.
|
||||
func copyData(field *data.Field, col arrow.Array) error {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
fmt.Println(fmt.Errorf("panic: %s %s", r, string(debug.Stack())))
|
||||
}
|
||||
}()
|
||||
|
||||
colData := col.Data()
|
||||
|
||||
switch col.DataType().ID() {
|
||||
case arrow.TIMESTAMP:
|
||||
v := array.NewTimestampData(colData)
|
||||
for i := 0; i < v.Len(); i++ {
|
||||
if field.Nullable() {
|
||||
if v.IsNull(i) {
|
||||
var t *time.Time
|
||||
field.Append(t)
|
||||
continue
|
||||
}
|
||||
t := v.Value(i).ToTime(arrow.Nanosecond)
|
||||
field.Append(&t)
|
||||
continue
|
||||
}
|
||||
field.Append(v.Value(i).ToTime(arrow.Nanosecond))
|
||||
}
|
||||
case arrow.DENSE_UNION:
|
||||
v := array.NewDenseUnionData(colData)
|
||||
for i := 0; i < v.Len(); i++ {
|
||||
sc, err := scalar.GetScalar(v, i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value := sc.(*scalar.DenseUnion).ChildValue()
|
||||
|
||||
var d any
|
||||
switch value.DataType().ID() {
|
||||
case arrow.STRING:
|
||||
d = value.(*scalar.String).String()
|
||||
case arrow.BOOL:
|
||||
d = value.(*scalar.Boolean).Value
|
||||
case arrow.INT32:
|
||||
d = value.(*scalar.Int32).Value
|
||||
case arrow.INT64:
|
||||
d = value.(*scalar.Int64).Value
|
||||
case arrow.LIST:
|
||||
d = value.(*scalar.List).Value
|
||||
default:
|
||||
d = value.(*scalar.Null)
|
||||
}
|
||||
b, err := json.Marshal(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
field.Append(json.RawMessage(b))
|
||||
}
|
||||
case arrow.STRING:
|
||||
copyBasic[string](field, array.NewStringData(colData))
|
||||
case arrow.UINT8:
|
||||
copyBasic[uint8](field, array.NewUint8Data(colData))
|
||||
case arrow.UINT16:
|
||||
copyBasic[uint16](field, array.NewUint16Data(colData))
|
||||
case arrow.UINT32:
|
||||
copyBasic[uint32](field, array.NewUint32Data(colData))
|
||||
case arrow.UINT64:
|
||||
copyBasic[uint64](field, array.NewUint64Data(colData))
|
||||
case arrow.INT8:
|
||||
copyBasic[int8](field, array.NewInt8Data(colData))
|
||||
case arrow.INT16:
|
||||
copyBasic[int16](field, array.NewInt16Data(colData))
|
||||
case arrow.INT32:
|
||||
copyBasic[int32](field, array.NewInt32Data(colData))
|
||||
case arrow.INT64:
|
||||
copyBasic[int64](field, array.NewInt64Data(colData))
|
||||
case arrow.FLOAT32:
|
||||
copyBasic[float32](field, array.NewFloat32Data(colData))
|
||||
case arrow.FLOAT64:
|
||||
copyBasic[float64](field, array.NewFloat64Data(colData))
|
||||
case arrow.BOOL:
|
||||
copyBasic[bool](field, array.NewBooleanData(colData))
|
||||
case arrow.DURATION:
|
||||
copyBasic[int64](field, array.NewInt64Data(colData))
|
||||
default:
|
||||
fmt.Printf("datatype %s is unhandled", col.DataType().ID())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type arrowArray[T any] interface {
|
||||
IsNull(int) bool
|
||||
Value(int) T
|
||||
Len() int
|
||||
}
|
||||
|
||||
func copyBasic[T any, Array arrowArray[T]](dst *data.Field, src Array) {
|
||||
for i := 0; i < src.Len(); i++ {
|
||||
if dst.Nullable() {
|
||||
if src.IsNull(i) {
|
||||
var s *T
|
||||
dst.Append(s)
|
||||
continue
|
||||
}
|
||||
s := src.Value(i)
|
||||
dst.Append(&s)
|
||||
continue
|
||||
}
|
||||
dst.Append(src.Value(i))
|
||||
}
|
||||
}
|
496
pkg/tsdb/influxdb/fsql/arrow_test.go
Normal file
496
pkg/tsdb/influxdb/fsql/arrow_test.go
Normal file
@ -0,0 +1,496 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestNewQueryDataResponse(t *testing.T) {
|
||||
alloc := memory.DefaultAllocator
|
||||
schema := arrow.NewSchema(
|
||||
[]arrow.Field{
|
||||
{Name: "i8", Type: arrow.PrimitiveTypes.Int8},
|
||||
{Name: "i16", Type: arrow.PrimitiveTypes.Int16},
|
||||
{Name: "i32", Type: arrow.PrimitiveTypes.Int32},
|
||||
{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
|
||||
|
||||
{Name: "u8", Type: arrow.PrimitiveTypes.Uint8},
|
||||
{Name: "u16", Type: arrow.PrimitiveTypes.Uint16},
|
||||
{Name: "u32", Type: arrow.PrimitiveTypes.Uint32},
|
||||
{Name: "u64", Type: arrow.PrimitiveTypes.Uint64},
|
||||
|
||||
{Name: "f32", Type: arrow.PrimitiveTypes.Float32},
|
||||
{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
|
||||
|
||||
{Name: "utf8", Type: &arrow.StringType{}},
|
||||
{Name: "duration", Type: &arrow.DurationType{}},
|
||||
{Name: "timestamp", Type: &arrow.TimestampType{}},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
strValues := []jsonArray{
|
||||
newJSONArray(`[1, -2, 3]`, arrow.PrimitiveTypes.Int8),
|
||||
newJSONArray(`[1, -2, 3]`, arrow.PrimitiveTypes.Int16),
|
||||
newJSONArray(`[1, -2, 3]`, arrow.PrimitiveTypes.Int32),
|
||||
newJSONArray(`[1, -2, 3]`, arrow.PrimitiveTypes.Int64),
|
||||
|
||||
newJSONArray(`[1, 2, 3]`, arrow.PrimitiveTypes.Uint8),
|
||||
newJSONArray(`[1, 2, 3]`, arrow.PrimitiveTypes.Uint16),
|
||||
newJSONArray(`[1, 2, 3]`, arrow.PrimitiveTypes.Uint32),
|
||||
newJSONArray(`[1, 2, 3]`, arrow.PrimitiveTypes.Uint64),
|
||||
|
||||
newJSONArray(`[1.1, -2.2, 3.0]`, arrow.PrimitiveTypes.Float32),
|
||||
newJSONArray(`[1.1, -2.2, 3.0]`, arrow.PrimitiveTypes.Float64),
|
||||
|
||||
newJSONArray(`["foo", "bar", "baz"]`, &arrow.StringType{}),
|
||||
newJSONArray(`[0, 1, -2]`, &arrow.DurationType{}),
|
||||
newJSONArray(`[0, 1, 2]`, &arrow.TimestampType{}),
|
||||
}
|
||||
|
||||
var arr []arrow.Array
|
||||
for _, v := range strValues {
|
||||
tarr, _, err := array.FromJSON(
|
||||
alloc,
|
||||
v.dt,
|
||||
strings.NewReader(v.json),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
arr = append(arr, tarr)
|
||||
}
|
||||
|
||||
record := array.NewRecord(schema, arr, -1)
|
||||
records := []arrow.Record{record}
|
||||
reader, err := array.NewRecordReader(schema, records)
|
||||
assert.NoError(t, err)
|
||||
|
||||
query := sqlutil.Query{Format: sqlutil.FormatOptionTable}
|
||||
resp := newQueryDataResponse(errReader{RecordReader: reader}, query, metadata.MD{})
|
||||
assert.NoError(t, resp.Error)
|
||||
assert.Len(t, resp.Frames, 1)
|
||||
assert.Len(t, resp.Frames[0].Fields, 13)
|
||||
|
||||
frame := resp.Frames[0]
|
||||
f0 := frame.Fields[0]
|
||||
assert.Equal(t, f0.Name, "i8")
|
||||
assert.Equal(t, f0.Type(), data.FieldTypeInt8)
|
||||
assert.Equal(t, []int8{1, -2, 3}, extractFieldValues[int8](t, f0))
|
||||
|
||||
f1 := frame.Fields[1]
|
||||
assert.Equal(t, f1.Name, "i16")
|
||||
assert.Equal(t, f1.Type(), data.FieldTypeInt16)
|
||||
assert.Equal(t, []int16{1, -2, 3}, extractFieldValues[int16](t, f1))
|
||||
|
||||
f2 := frame.Fields[2]
|
||||
assert.Equal(t, f2.Name, "i32")
|
||||
assert.Equal(t, f2.Type(), data.FieldTypeInt32)
|
||||
assert.Equal(t, []int32{1, -2, 3}, extractFieldValues[int32](t, f2))
|
||||
|
||||
f3 := frame.Fields[3]
|
||||
assert.Equal(t, f3.Name, "i64")
|
||||
assert.Equal(t, f3.Type(), data.FieldTypeInt64)
|
||||
assert.Equal(t, []int64{1, -2, 3}, extractFieldValues[int64](t, f3))
|
||||
|
||||
f4 := frame.Fields[4]
|
||||
assert.Equal(t, f4.Name, "u8")
|
||||
assert.Equal(t, f4.Type(), data.FieldTypeUint8)
|
||||
assert.Equal(t, []uint8{1, 2, 3}, extractFieldValues[uint8](t, f4))
|
||||
|
||||
f5 := frame.Fields[5]
|
||||
assert.Equal(t, f5.Name, "u16")
|
||||
assert.Equal(t, f5.Type(), data.FieldTypeUint16)
|
||||
assert.Equal(t, []uint16{1, 2, 3}, extractFieldValues[uint16](t, f5))
|
||||
|
||||
f6 := frame.Fields[6]
|
||||
assert.Equal(t, f6.Name, "u32")
|
||||
assert.Equal(t, f6.Type(), data.FieldTypeUint32)
|
||||
assert.Equal(t, []uint32{1, 2, 3}, extractFieldValues[uint32](t, f6))
|
||||
|
||||
f7 := frame.Fields[7]
|
||||
assert.Equal(t, f7.Name, "u64")
|
||||
assert.Equal(t, f7.Type(), data.FieldTypeUint64)
|
||||
assert.Equal(t, []uint64{1, 2, 3}, extractFieldValues[uint64](t, f7))
|
||||
|
||||
f8 := frame.Fields[8]
|
||||
assert.Equal(t, f8.Name, "f32")
|
||||
assert.Equal(t, f8.Type(), data.FieldTypeFloat32)
|
||||
assert.Equal(t, []float32{1.1, -2.2, 3.0}, extractFieldValues[float32](t, f8))
|
||||
|
||||
f9 := frame.Fields[9]
|
||||
assert.Equal(t, f9.Name, "f64")
|
||||
assert.Equal(t, f9.Type(), data.FieldTypeFloat64)
|
||||
assert.Equal(t, []float64{1.1, -2.2, 3.0}, extractFieldValues[float64](t, f9))
|
||||
|
||||
f10 := frame.Fields[10]
|
||||
assert.Equal(t, f10.Name, "utf8")
|
||||
assert.Equal(t, f10.Type(), data.FieldTypeString)
|
||||
assert.Equal(t, []string{"foo", "bar", "baz"}, extractFieldValues[string](t, f10))
|
||||
|
||||
f11 := frame.Fields[11]
|
||||
assert.Equal(t, f11.Name, "duration")
|
||||
assert.Equal(t, f11.Type(), data.FieldTypeInt64)
|
||||
assert.Equal(t, []int64{0, 1, -2}, extractFieldValues[int64](t, f11))
|
||||
|
||||
f12 := frame.Fields[12]
|
||||
assert.Equal(t, f12.Name, "timestamp")
|
||||
assert.Equal(t, f12.Type(), data.FieldTypeTime)
|
||||
assert.Equal(t,
|
||||
[]time.Time{
|
||||
time.Unix(0, 0).UTC(),
|
||||
time.Unix(0, 1).UTC(),
|
||||
time.Unix(0, 2).UTC(),
|
||||
},
|
||||
extractFieldValues[time.Time](t, f12),
|
||||
)
|
||||
}
|
||||
|
||||
type jsonArray struct {
|
||||
json string
|
||||
dt arrow.DataType
|
||||
}
|
||||
|
||||
func newJSONArray(json string, dt arrow.DataType) jsonArray {
|
||||
return jsonArray{json: json, dt: dt}
|
||||
}
|
||||
|
||||
func TestNewQueryDataResponse_Error(t *testing.T) {
|
||||
alloc := memory.DefaultAllocator
|
||||
schema := arrow.NewSchema(
|
||||
[]arrow.Field{
|
||||
{Name: "f1-i64", Type: arrow.PrimitiveTypes.Int64},
|
||||
{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
i64s, _, err := array.FromJSON(
|
||||
alloc,
|
||||
&arrow.Int64Type{},
|
||||
strings.NewReader(`[1, 2, 3]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
f64s, _, err := array.FromJSON(
|
||||
alloc,
|
||||
&arrow.Float64Type{},
|
||||
strings.NewReader(`[1.1, 2.2, 3.3]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
|
||||
record := array.NewRecord(schema, []arrow.Array{i64s, f64s}, -1)
|
||||
records := []arrow.Record{record}
|
||||
reader, err := array.NewRecordReader(schema, records)
|
||||
assert.NoError(t, err)
|
||||
|
||||
wrappedReader := errReader{
|
||||
RecordReader: reader,
|
||||
err: fmt.Errorf("explosion!"),
|
||||
}
|
||||
query := sqlutil.Query{Format: sqlutil.FormatOptionTable}
|
||||
resp := newQueryDataResponse(wrappedReader, query, metadata.MD{})
|
||||
assert.Error(t, resp.Error)
|
||||
assert.Equal(t, fmt.Errorf("explosion!"), resp.Error)
|
||||
}
|
||||
|
||||
func TestNewQueryDataResponse_WideTable(t *testing.T) {
|
||||
alloc := memory.DefaultAllocator
|
||||
schema := arrow.NewSchema(
|
||||
[]arrow.Field{
|
||||
{Name: "time", Type: &arrow.TimestampType{}},
|
||||
{Name: "label", Type: &arrow.StringType{}},
|
||||
{Name: "value", Type: arrow.PrimitiveTypes.Int64},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
times, _, err := array.FromJSON(
|
||||
alloc,
|
||||
&arrow.TimestampType{},
|
||||
strings.NewReader(`["2023-01-01T00:00:00Z", "2023-01-01T00:00:01Z", "2023-01-01T00:00:02Z"]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
strs, _, err := array.FromJSON(
|
||||
alloc,
|
||||
&arrow.StringType{},
|
||||
strings.NewReader(`["foo", "bar", "baz"]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
i64s, _, err := array.FromJSON(
|
||||
alloc,
|
||||
arrow.PrimitiveTypes.Int64,
|
||||
strings.NewReader(`[1, 2, 3]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
|
||||
record := array.NewRecord(schema, []arrow.Array{times, strs, i64s}, -1)
|
||||
records := []arrow.Record{record}
|
||||
reader, err := array.NewRecordReader(schema, records)
|
||||
assert.NoError(t, err)
|
||||
|
||||
resp := newQueryDataResponse(errReader{RecordReader: reader}, sqlutil.Query{}, metadata.MD{})
|
||||
assert.NoError(t, resp.Error)
|
||||
assert.Len(t, resp.Frames, 1)
|
||||
assert.Equal(t, 3, resp.Frames[0].Rows())
|
||||
assert.Len(t, resp.Frames[0].Fields, 4)
|
||||
|
||||
frame := resp.Frames[0]
|
||||
assert.Equal(t, "time", frame.Fields[0].Name)
|
||||
|
||||
// label=bar
|
||||
assert.Equal(t, "value", frame.Fields[1].Name)
|
||||
assert.Equal(t, data.Labels{"label": "bar"}, frame.Fields[1].Labels)
|
||||
assert.Equal(t, []int64{0, 2, 0}, extractFieldValues[int64](t, frame.Fields[1]))
|
||||
|
||||
// label=baz
|
||||
assert.Equal(t, "value", frame.Fields[2].Name)
|
||||
assert.Equal(t, data.Labels{"label": "baz"}, frame.Fields[2].Labels)
|
||||
assert.Equal(t, []int64{0, 0, 3}, extractFieldValues[int64](t, frame.Fields[2]))
|
||||
|
||||
// label=foo
|
||||
assert.Equal(t, "value", frame.Fields[3].Name)
|
||||
assert.Equal(t, data.Labels{"label": "foo"}, frame.Fields[3].Labels)
|
||||
assert.Equal(t, []int64{1, 0, 0}, extractFieldValues[int64](t, frame.Fields[3]))
|
||||
}
|
||||
|
||||
func extractFieldValues[T any](t *testing.T, field *data.Field) []T {
|
||||
t.Helper()
|
||||
|
||||
values := make([]T, 0, field.Len())
|
||||
for i := 0; i < cap(values); i++ {
|
||||
values = append(values, field.CopyAt(i).(T))
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
type errReader struct {
|
||||
array.RecordReader
|
||||
err error
|
||||
}
|
||||
|
||||
func (r errReader) Err() error {
|
||||
return r.err
|
||||
}
|
||||
|
||||
func TestNewFrame(t *testing.T) {
|
||||
schema := arrow.NewSchema([]arrow.Field{
|
||||
{
|
||||
Name: "name",
|
||||
Type: &arrow.StringType{},
|
||||
Nullable: false,
|
||||
Metadata: arrow.NewMetadata(nil, nil),
|
||||
},
|
||||
{
|
||||
Name: "time",
|
||||
Type: &arrow.TimestampType{},
|
||||
Nullable: false,
|
||||
Metadata: arrow.NewMetadata(nil, nil),
|
||||
},
|
||||
{
|
||||
Name: "extra",
|
||||
Type: &arrow.Int64Type{},
|
||||
Nullable: true,
|
||||
Metadata: arrow.NewMetadata(nil, nil),
|
||||
},
|
||||
}, nil)
|
||||
|
||||
actual := newFrame(schema)
|
||||
expected := &data.Frame{
|
||||
Fields: []*data.Field{
|
||||
data.NewField("name", nil, []string{}),
|
||||
data.NewField("time", nil, []time.Time{}),
|
||||
data.NewField("extra", nil, []*int64{}),
|
||||
},
|
||||
}
|
||||
if !cmp.Equal(expected, actual, cmp.Comparer(cmpFrame)) {
|
||||
log.Fatalf(cmp.Diff(expected, actual))
|
||||
}
|
||||
}
|
||||
|
||||
func cmpFrame(a, b data.Frame) bool {
|
||||
if len(a.Fields) != len(b.Fields) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(a.Fields); i++ {
|
||||
if a.Fields[i].Name != b.Fields[i].Name {
|
||||
return false
|
||||
}
|
||||
if a.Fields[i].Nullable() != b.Fields[i].Nullable() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func TestCopyData_String(t *testing.T) {
|
||||
field := data.NewField("field", nil, []string{})
|
||||
builder := array.NewStringBuilder(memory.DefaultAllocator)
|
||||
builder.Append("joe")
|
||||
builder.Append("john")
|
||||
builder.Append("jackie")
|
||||
err := copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "joe", field.CopyAt(0))
|
||||
assert.Equal(t, "john", field.CopyAt(1))
|
||||
assert.Equal(t, "jackie", field.CopyAt(2))
|
||||
|
||||
field = data.NewField("field", nil, []*string{})
|
||||
builder = array.NewStringBuilder(memory.DefaultAllocator)
|
||||
builder.Append("joe")
|
||||
builder.AppendNull()
|
||||
builder.Append("jackie")
|
||||
err = copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "joe", *(field.CopyAt(0).(*string)))
|
||||
assert.Equal(t, (*string)(nil), field.CopyAt(1))
|
||||
assert.Equal(t, "jackie", *(field.CopyAt(2).(*string)))
|
||||
}
|
||||
|
||||
func TestCopyData_Timestamp(t *testing.T) {
|
||||
start, _ := time.Parse(time.RFC3339, "2023-01-01T01:01:01Z")
|
||||
|
||||
field := data.NewField("field", nil, []time.Time{})
|
||||
builder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{})
|
||||
builder.Append(arrow.Timestamp(start.Add(time.Hour).UnixNano()))
|
||||
builder.Append(arrow.Timestamp(start.Add(2 * time.Hour).UnixNano()))
|
||||
builder.Append(arrow.Timestamp(start.Add(3 * time.Hour).UnixNano()))
|
||||
err := copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, start.Add(time.Hour), field.CopyAt(0))
|
||||
assert.Equal(t, start.Add(2*time.Hour), field.CopyAt(1))
|
||||
assert.Equal(t, start.Add(3*time.Hour), field.CopyAt(2))
|
||||
|
||||
field = data.NewField("field", nil, []*time.Time{})
|
||||
builder = array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{})
|
||||
builder.Append(arrow.Timestamp(start.Add(time.Hour).UnixNano()))
|
||||
builder.AppendNull()
|
||||
builder.Append(arrow.Timestamp(start.Add(3 * time.Hour).UnixNano()))
|
||||
err = copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, start.Add(time.Hour), *field.CopyAt(0).(*time.Time))
|
||||
assert.Equal(t, (*time.Time)(nil), field.CopyAt(1))
|
||||
assert.Equal(t, start.Add(3*time.Hour), *field.CopyAt(2).(*time.Time))
|
||||
}
|
||||
|
||||
func TestCopyData_Boolean(t *testing.T) {
|
||||
field := data.NewField("field", nil, []bool{})
|
||||
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
||||
builder.Append(true)
|
||||
builder.Append(false)
|
||||
builder.Append(true)
|
||||
err := copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, field.CopyAt(0))
|
||||
assert.Equal(t, false, field.CopyAt(1))
|
||||
assert.Equal(t, true, field.CopyAt(2))
|
||||
|
||||
field = data.NewField("field", nil, []*bool{})
|
||||
builder = array.NewBooleanBuilder(memory.DefaultAllocator)
|
||||
builder.Append(true)
|
||||
builder.AppendNull()
|
||||
builder.Append(true)
|
||||
err = copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, *field.CopyAt(0).(*bool))
|
||||
assert.Equal(t, (*bool)(nil), field.CopyAt(1))
|
||||
assert.Equal(t, true, *field.CopyAt(2).(*bool))
|
||||
}
|
||||
|
||||
func TestCopyData_Int64(t *testing.T) {
|
||||
field := data.NewField("field", nil, []int64{})
|
||||
builder := array.NewInt64Builder(memory.DefaultAllocator)
|
||||
builder.Append(1)
|
||||
builder.Append(2)
|
||||
builder.Append(3)
|
||||
err := copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), field.CopyAt(0))
|
||||
assert.Equal(t, int64(2), field.CopyAt(1))
|
||||
assert.Equal(t, int64(3), field.CopyAt(2))
|
||||
|
||||
field = data.NewField("field", nil, []*int64{})
|
||||
builder = array.NewInt64Builder(memory.DefaultAllocator)
|
||||
builder.Append(1)
|
||||
builder.AppendNull()
|
||||
builder.Append(3)
|
||||
arr := builder.NewArray()
|
||||
err = copyData(field, arr)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), *field.CopyAt(0).(*int64))
|
||||
assert.Equal(t, (*int64)(nil), field.CopyAt(1))
|
||||
assert.Equal(t, int64(3), *field.CopyAt(2).(*int64))
|
||||
}
|
||||
|
||||
func TestCopyData_Float64(t *testing.T) {
|
||||
field := data.NewField("field", nil, []float64{})
|
||||
builder := array.NewFloat64Builder(memory.DefaultAllocator)
|
||||
builder.Append(1.1)
|
||||
builder.Append(2.2)
|
||||
builder.Append(3.3)
|
||||
err := copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, float64(1.1), field.CopyAt(0))
|
||||
assert.Equal(t, float64(2.2), field.CopyAt(1))
|
||||
assert.Equal(t, float64(3.3), field.CopyAt(2))
|
||||
|
||||
field = data.NewField("field", nil, []*float64{})
|
||||
builder = array.NewFloat64Builder(memory.DefaultAllocator)
|
||||
builder.Append(1.1)
|
||||
builder.AppendNull()
|
||||
builder.Append(3.3)
|
||||
err = copyData(field, builder.NewArray())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, float64(1.1), *field.CopyAt(0).(*float64))
|
||||
assert.Equal(t, (*float64)(nil), field.CopyAt(1))
|
||||
assert.Equal(t, float64(3.3), *field.CopyAt(2).(*float64))
|
||||
}
|
||||
|
||||
func TestCustomMetadata(t *testing.T) {
|
||||
schema := arrow.NewSchema([]arrow.Field{
|
||||
{
|
||||
Name: "int64",
|
||||
Type: &arrow.Int64Type{},
|
||||
Nullable: true,
|
||||
Metadata: arrow.NewMetadata(nil, nil),
|
||||
},
|
||||
}, nil)
|
||||
i64s, _, err := array.FromJSON(
|
||||
memory.DefaultAllocator,
|
||||
arrow.PrimitiveTypes.Int64,
|
||||
strings.NewReader(`[1, 2, 3]`),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
|
||||
record := array.NewRecord(schema, []arrow.Array{i64s}, -1)
|
||||
records := []arrow.Record{record}
|
||||
reader, err := array.NewRecordReader(schema, records)
|
||||
assert.NoError(t, err)
|
||||
|
||||
md := metadata.MD{}
|
||||
md.Set("trace-id", "abc")
|
||||
md.Set("trace-sampled", "true")
|
||||
query := sqlutil.Query{
|
||||
Format: sqlutil.FormatOptionTable,
|
||||
}
|
||||
resp := newQueryDataResponse(errReader{RecordReader: reader}, query, md)
|
||||
assert.NoError(t, resp.Error)
|
||||
|
||||
assert.Equal(t, map[string]any{
|
||||
"headers": metadata.MD{
|
||||
"trace-id": []string{"abc"},
|
||||
"trace-sampled": []string{"true"},
|
||||
},
|
||||
}, resp.Frames[0].Meta.Custom)
|
||||
}
|
117
pkg/tsdb/influxdb/fsql/client.go
Normal file
117
pkg/tsdb/influxdb/fsql/client.go
Normal file
@ -0,0 +1,117 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow/flight"
|
||||
"github.com/apache/arrow/go/v12/arrow/flight/flightsql"
|
||||
"github.com/apache/arrow/go/v12/arrow/ipc"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
*flightsql.Client
|
||||
md metadata.MD
|
||||
}
|
||||
|
||||
// FlightClient returns the underlying [flight.Client].
|
||||
func (c *client) FlightClient() flight.Client {
|
||||
return c.Client.Client
|
||||
}
|
||||
|
||||
func newFlightSQLClient(addr string, metadata metadata.MD, secure bool) (*client, error) {
|
||||
dialOptions, err := grpcDialOptions(secure)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("grpc dial options: %s", err)
|
||||
}
|
||||
fsqlClient, err := flightsql.NewClient(addr, nil, nil, dialOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &client{Client: fsqlClient, md: metadata}, nil
|
||||
}
|
||||
|
||||
func grpcDialOptions(secure bool) ([]grpc.DialOption, error) {
|
||||
transport := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
if secure {
|
||||
pool, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("x509: %s", err)
|
||||
}
|
||||
transport = grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(pool, ""))
|
||||
}
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
transport,
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
// DoGetWithHeaderExtraction performs a normal DoGet, but wraps the stream in a
|
||||
// mechanism that extracts headers when they become available. At least one
|
||||
// record should be read from the *flightReader before the headers are
|
||||
// available.
|
||||
func (c *client) DoGetWithHeaderExtraction(ctx context.Context, in *flight.Ticket, opts ...grpc.CallOption) (*flightReader, error) {
|
||||
stream, err := c.Client.Client.DoGet(ctx, in, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newFlightReader(stream, c.Client.Alloc)
|
||||
}
|
||||
|
||||
// flightReader wraps a [flight.Reader] to expose the headers captured when the
|
||||
// first read occurs on the stream.
|
||||
type flightReader struct {
|
||||
*flight.Reader
|
||||
extractor *headerExtractor
|
||||
}
|
||||
|
||||
// newFlightReader returns a [flightReader].
|
||||
func newFlightReader(stream flight.FlightService_DoGetClient, alloc memory.Allocator) (*flightReader, error) {
|
||||
extractor := &headerExtractor{stream: stream}
|
||||
reader, err := flight.NewRecordReader(extractor, ipc.WithAllocator(alloc))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &flightReader{
|
||||
Reader: reader,
|
||||
extractor: extractor,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Header returns the extracted headers if they exist.
|
||||
func (s *flightReader) Header() (metadata.MD, error) {
|
||||
return s.extractor.Header()
|
||||
}
|
||||
|
||||
// headerExtractor collects the stream's headers on the first call to
|
||||
// [(*headerExtractor).Recv].
|
||||
type headerExtractor struct {
|
||||
stream flight.FlightService_DoGetClient
|
||||
|
||||
once sync.Once
|
||||
header metadata.MD
|
||||
err error
|
||||
}
|
||||
|
||||
// Header returns the extracted headers if they exist.
|
||||
func (s *headerExtractor) Header() (metadata.MD, error) {
|
||||
return s.header, s.err
|
||||
}
|
||||
|
||||
// Recv reads from the stream. The first invocation will capture the headers.
|
||||
func (s *headerExtractor) Recv() (*flight.FlightData, error) {
|
||||
data, err := s.stream.Recv()
|
||||
s.once.Do(func() {
|
||||
s.header, s.err = s.stream.Header()
|
||||
})
|
||||
return data, err
|
||||
}
|
98
pkg/tsdb/influxdb/fsql/flightsql_test.go
Normal file
98
pkg/tsdb/influxdb/fsql/flightsql_test.go
Normal file
@ -0,0 +1,98 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow/flight"
|
||||
"github.com/apache/arrow/go/v12/arrow/flight/flightsql"
|
||||
"github.com/apache/arrow/go/v12/arrow/flight/flightsql/example"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
|
||||
)
|
||||
|
||||
func TestIntegration_QueryData(t *testing.T) {
|
||||
db, err := example.CreateDB()
|
||||
require.NoError(t, err)
|
||||
defer func(db *sql.DB) {
|
||||
err := db.Close()
|
||||
assert.NoError(t, err)
|
||||
}(db)
|
||||
|
||||
sqliteServer, err := example.NewSQLiteFlightSQLServer(db)
|
||||
require.NoError(t, err)
|
||||
sqliteServer.Alloc = memory.NewCheckedAllocator(memory.DefaultAllocator)
|
||||
server := flight.NewServerWithMiddleware(nil)
|
||||
server.RegisterFlightService(flightsql.NewFlightServer(sqliteServer))
|
||||
err = server.Init("localhost:12345")
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
err := server.Serve()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
defer server.Shutdown()
|
||||
|
||||
resp, err := Query(
|
||||
context.Background(),
|
||||
&models.DatasourceInfo{
|
||||
HTTPClient: nil,
|
||||
Token: "secret",
|
||||
URL: "http://localhost:12345",
|
||||
DbName: "influxdb",
|
||||
Version: "test",
|
||||
HTTPMode: "proxy",
|
||||
Metadata: []map[string]string{
|
||||
{
|
||||
"bucket": "bucket",
|
||||
},
|
||||
},
|
||||
SecureGrpc: false,
|
||||
},
|
||||
backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: mustQueryJSON(t, "A", "select * from intTable"),
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
JSON: mustQueryJSON(t, "B", "select 1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, resp.Responses, 2)
|
||||
|
||||
respA := resp.Responses["A"]
|
||||
require.NoError(t, respA.Error)
|
||||
frame := respA.Frames[0]
|
||||
|
||||
require.Equal(t, "id", frame.Fields[0].Name)
|
||||
require.Equal(t, "keyName", frame.Fields[1].Name)
|
||||
require.Equal(t, "value", frame.Fields[2].Name)
|
||||
require.Equal(t, "foreignId", frame.Fields[3].Name)
|
||||
for _, f := range frame.Fields {
|
||||
assert.Equal(t, 4, f.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func mustQueryJSON(t *testing.T, refID, sql string) []byte {
|
||||
t.Helper()
|
||||
|
||||
b, err := json.Marshal(queryRequest{
|
||||
RefID: refID,
|
||||
RawQuery: sql,
|
||||
Format: "table",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b
|
||||
}
|
126
pkg/tsdb/influxdb/fsql/fsql.go
Normal file
126
pkg/tsdb/influxdb/fsql/fsql.go
Normal file
@ -0,0 +1,126 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
|
||||
)
|
||||
|
||||
var (
|
||||
glog = log.New("tsdb.influx_flightsql")
|
||||
)
|
||||
|
||||
type SQLOptions struct {
|
||||
Addr string `json:"host"`
|
||||
Metadata []map[string]string `json:"metadata"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req backend.QueryDataRequest) (
|
||||
*backend.QueryDataResponse, error) {
|
||||
logger := glog.FromContext(ctx)
|
||||
tRes := backend.NewQueryDataResponse()
|
||||
r, err := runnerFromDataSource(dsInfo)
|
||||
if err != nil {
|
||||
return tRes, err
|
||||
}
|
||||
defer func(client *client) {
|
||||
err := client.Close()
|
||||
if err != nil {
|
||||
logger.Warn("Failed to close fsql client", "err", err)
|
||||
}
|
||||
}(r.client)
|
||||
|
||||
if r.client.md.Len() != 0 {
|
||||
ctx = metadata.NewOutgoingContext(ctx, r.client.md)
|
||||
}
|
||||
|
||||
for _, q := range req.Queries {
|
||||
qm, err := getQueryModel(q)
|
||||
if err != nil {
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, "bad request")
|
||||
continue
|
||||
}
|
||||
|
||||
info, err := r.client.Execute(ctx, qm.RawSQL)
|
||||
if err != nil {
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, fmt.Sprintf("flightsql: %s", err))
|
||||
return tRes, nil
|
||||
}
|
||||
if len(info.Endpoint) != 1 {
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, fmt.Sprintf("unsupported endpoint count in response: %d", len(info.Endpoint)))
|
||||
return tRes, nil
|
||||
}
|
||||
|
||||
reader, err := r.client.DoGetWithHeaderExtraction(ctx, info.Endpoint[0].Ticket)
|
||||
if err != nil {
|
||||
tRes.Responses[q.RefID] = backend.ErrDataResponse(backend.StatusInternal, fmt.Sprintf("flightsql: %s", err))
|
||||
return tRes, nil
|
||||
}
|
||||
defer reader.Release()
|
||||
|
||||
headers, err := reader.Header()
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to extract headers: %s", err))
|
||||
}
|
||||
|
||||
tRes.Responses[q.RefID] = newQueryDataResponse(reader, *qm.Query, headers)
|
||||
}
|
||||
|
||||
return tRes, nil
|
||||
}
|
||||
|
||||
type runner struct {
|
||||
client *client
|
||||
}
|
||||
|
||||
// runnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
|
||||
func runnerFromDataSource(dsInfo *models.DatasourceInfo) (*runner, error) {
|
||||
if dsInfo.URL == "" {
|
||||
return nil, fmt.Errorf("missing URL from datasource configuration")
|
||||
}
|
||||
|
||||
u, err := url.Parse(dsInfo.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("bad URL : %s", err)
|
||||
}
|
||||
|
||||
host, port, err := net.SplitHostPort(u.Host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("bad URL : %s", err)
|
||||
}
|
||||
addr := strings.Join([]string{host, port}, ":")
|
||||
|
||||
md := metadata.MD{}
|
||||
for _, m := range dsInfo.Metadata {
|
||||
for k, v := range m {
|
||||
if _, ok := md[k]; ok {
|
||||
return nil, fmt.Errorf("metadata: duplicate key: %s", k)
|
||||
}
|
||||
if k != "" {
|
||||
md.Set(k, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if dsInfo.Token != "" {
|
||||
md.Set("Authorization", fmt.Sprintf("Bearer %s", dsInfo.Token))
|
||||
}
|
||||
|
||||
fsqlClient, err := newFlightSQLClient(addr, md, dsInfo.SecureGrpc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &runner{
|
||||
client: fsqlClient,
|
||||
}, nil
|
||||
}
|
109
pkg/tsdb/influxdb/fsql/macro.go
Normal file
109
pkg/tsdb/influxdb/fsql/macro.go
Normal file
@ -0,0 +1,109 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
)
|
||||
|
||||
var macros = sqlutil.Macros{
|
||||
"dateBin": macroDateBin(""),
|
||||
"dateBinAlias": macroDateBin("_binned"),
|
||||
"interval": macroInterval,
|
||||
"timeGroup": macroTimeGroup,
|
||||
"timeGroupAlias": macroTimeGroupAlias,
|
||||
|
||||
// The behaviors of timeFrom and timeTo as defined in the SDK are different
|
||||
// from all other Grafana SQL plugins. Instead we'll take their
|
||||
// implementations, rename them and define timeFrom and timeTo ourselves.
|
||||
"timeRangeFrom": sqlutil.DefaultMacros["timeFrom"],
|
||||
"timeRangeTo": sqlutil.DefaultMacros["timeTo"],
|
||||
"timeRange": sqlutil.DefaultMacros["timeFilter"],
|
||||
"timeTo": macroTo,
|
||||
"timeFrom": macroFrom,
|
||||
}
|
||||
|
||||
func macroTimeGroup(query *sqlutil.Query, args []string) (string, error) {
|
||||
if len(args) != 2 {
|
||||
return "", fmt.Errorf("%w: expected 1 argument, received %d", sqlutil.ErrorBadArgumentCount, len(args))
|
||||
}
|
||||
|
||||
column := args[0]
|
||||
|
||||
res := ""
|
||||
switch args[1] {
|
||||
case "minute":
|
||||
res += fmt.Sprintf("datepart('minute', %s),", column)
|
||||
fallthrough
|
||||
case "hour":
|
||||
res += fmt.Sprintf("datepart('hour', %s),", column)
|
||||
fallthrough
|
||||
case "day":
|
||||
res += fmt.Sprintf("datepart('day', %s),", column)
|
||||
fallthrough
|
||||
case "month":
|
||||
res += fmt.Sprintf("datepart('month', %s),", column)
|
||||
fallthrough
|
||||
case "year":
|
||||
res += fmt.Sprintf("datepart('year', %s)", column)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func macroTimeGroupAlias(query *sqlutil.Query, args []string) (string, error) {
|
||||
if len(args) != 2 {
|
||||
return "", fmt.Errorf("%w: expected 1 argument, received %d", sqlutil.ErrorBadArgumentCount, len(args))
|
||||
}
|
||||
|
||||
column := args[0]
|
||||
|
||||
res := ""
|
||||
switch args[1] {
|
||||
case "minute":
|
||||
res += fmt.Sprintf("datepart('minute', %s) as %s_minute,", column, column)
|
||||
fallthrough
|
||||
case "hour":
|
||||
res += fmt.Sprintf("datepart('hour', %s) as %s_hour,", column, column)
|
||||
fallthrough
|
||||
case "day":
|
||||
res += fmt.Sprintf("datepart('day', %s) as %s_day,", column, column)
|
||||
fallthrough
|
||||
case "month":
|
||||
res += fmt.Sprintf("datepart('month', %s) as %s_month,", column, column)
|
||||
fallthrough
|
||||
case "year":
|
||||
res += fmt.Sprintf("datepart('year', %s) as %s_year", column, column)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func macroInterval(query *sqlutil.Query, _ []string) (string, error) {
|
||||
return fmt.Sprintf("interval '%d second'", int64(query.Interval.Seconds())), nil
|
||||
}
|
||||
|
||||
func macroFrom(query *sqlutil.Query, _ []string) (string, error) {
|
||||
return fmt.Sprintf("cast('%s' as timestamp)", query.TimeRange.From.Format(time.RFC3339)), nil
|
||||
}
|
||||
|
||||
func macroTo(query *sqlutil.Query, _ []string) (string, error) {
|
||||
return fmt.Sprintf("cast('%s' as timestamp)", query.TimeRange.To.Format(time.RFC3339)), nil
|
||||
}
|
||||
|
||||
func macroDateBin(suffix string) sqlutil.MacroFunc {
|
||||
return func(query *sqlutil.Query, args []string) (string, error) {
|
||||
if len(args) != 1 {
|
||||
return "", fmt.Errorf("%w: expected 1 argument, received %d", sqlutil.ErrorBadArgumentCount, len(args))
|
||||
}
|
||||
column := args[0]
|
||||
aliasing := func() string {
|
||||
if suffix == "" {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf(" as %s%s", column, suffix)
|
||||
}()
|
||||
return fmt.Sprintf("date_bin(interval '%d second', %s, timestamp '1970-01-01T00:00:00Z')%s", int64(query.Interval.Seconds()), column, aliasing), nil
|
||||
}
|
||||
}
|
75
pkg/tsdb/influxdb/fsql/macro_test.go
Normal file
75
pkg/tsdb/influxdb/fsql/macro_test.go
Normal file
@ -0,0 +1,75 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
from, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z")
|
||||
|
||||
query := sqlutil.Query{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: from,
|
||||
To: from.Add(10 * time.Minute),
|
||||
},
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
|
||||
cs := []struct {
|
||||
in string
|
||||
out string
|
||||
}{
|
||||
{
|
||||
in: `select * from x`,
|
||||
out: `select * from x`,
|
||||
},
|
||||
{
|
||||
in: `select date_bin($__interval, time, timestamp '1970-01-01T00:00:00Z')`,
|
||||
out: `select date_bin(interval '10 second', time, timestamp '1970-01-01T00:00:00Z')`,
|
||||
},
|
||||
{
|
||||
in: `select $__dateBin(time)`,
|
||||
out: `select date_bin(interval '10 second', time, timestamp '1970-01-01T00:00:00Z')`,
|
||||
},
|
||||
{
|
||||
in: `select $__dateBinAlias(time)`,
|
||||
out: `select date_bin(interval '10 second', time, timestamp '1970-01-01T00:00:00Z') as time_binned`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where $__timeFilter(time)`,
|
||||
out: `select * from x where time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:10:00Z'`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where $__timeRangeFrom(time)`,
|
||||
out: `select * from x where time >= '2023-01-01T00:00:00Z'`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where $__timeRangeTo(time)`,
|
||||
out: `select * from x where time <= '2023-01-01T00:10:00Z'`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where $__timeRange(time)`,
|
||||
out: `select * from x where time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:10:00Z'`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where time >= $__timeFrom`,
|
||||
out: `select * from x where time >= cast('2023-01-01T00:00:00Z' as timestamp)`,
|
||||
},
|
||||
{
|
||||
in: `select * from x where time < $__timeTo`,
|
||||
out: `select * from x where time < cast('2023-01-01T00:10:00Z' as timestamp)`,
|
||||
},
|
||||
}
|
||||
for _, c := range cs {
|
||||
t.Run(c.in, func(t *testing.T) {
|
||||
sql, err := sqlutil.Interpolate(query.WithSQL(c.in), macros)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.out, sql)
|
||||
})
|
||||
}
|
||||
}
|
59
pkg/tsdb/influxdb/fsql/query_model.go
Normal file
59
pkg/tsdb/influxdb/fsql/query_model.go
Normal file
@ -0,0 +1,59 @@
|
||||
package fsql
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
)
|
||||
|
||||
type queryModel struct {
|
||||
*sqlutil.Query
|
||||
}
|
||||
|
||||
// queryRequest is an inbound query request as part of a batch of queries sent
|
||||
// to [(*FlightSQLDatasource).QueryData].
|
||||
type queryRequest struct {
|
||||
RefID string `json:"refId"`
|
||||
RawQuery string `json:"query"`
|
||||
IntervalMilliseconds int `json:"intervalMs"`
|
||||
MaxDataPoints int64 `json:"maxDataPoints"`
|
||||
Format string `json:"resultFormat"`
|
||||
}
|
||||
|
||||
func getQueryModel(dataQuery backend.DataQuery) (*queryModel, error) {
|
||||
var q queryRequest
|
||||
if err := json.Unmarshal(dataQuery.JSON, &q); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal json: %w", err)
|
||||
}
|
||||
|
||||
var format sqlutil.FormatQueryOption
|
||||
switch q.Format {
|
||||
case "time_series":
|
||||
format = sqlutil.FormatOptionTimeSeries
|
||||
case "table":
|
||||
format = sqlutil.FormatOptionTable
|
||||
default:
|
||||
format = sqlutil.FormatOptionTimeSeries
|
||||
}
|
||||
|
||||
query := &sqlutil.Query{
|
||||
RawSQL: q.RawQuery,
|
||||
RefID: q.RefID,
|
||||
MaxDataPoints: q.MaxDataPoints,
|
||||
Interval: time.Duration(q.IntervalMilliseconds) * time.Millisecond,
|
||||
TimeRange: dataQuery.TimeRange,
|
||||
Format: format,
|
||||
}
|
||||
|
||||
// Process macros and execute the query.
|
||||
sql, err := sqlutil.Interpolate(query, macros)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("macro interpolation: %w", err)
|
||||
}
|
||||
query.RawSQL = sql
|
||||
|
||||
return &queryModel{query}, nil
|
||||
}
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/fsql"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
|
||||
)
|
||||
|
||||
@ -34,6 +35,8 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
|
||||
return CheckFluxHealth(ctx, dsInfo, req)
|
||||
case influxVersionInfluxQL:
|
||||
return CheckInfluxQLHealth(ctx, dsInfo, s)
|
||||
case influxVersionSQL:
|
||||
return CheckSQLHealth(ctx, dsInfo, req)
|
||||
default:
|
||||
return getHealthCheckMessage(logger, "", errors.New("unknown influx version"))
|
||||
}
|
||||
@ -115,6 +118,41 @@ func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo, s *
|
||||
return getHealthCheckMessage(logger, "", errors.New("error connecting influxDB influxQL"))
|
||||
}
|
||||
|
||||
func CheckSQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
ds, err := fsql.Query(ctx, dsInfo, backend.QueryDataRequest{
|
||||
PluginContext: req.PluginContext,
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: refID,
|
||||
JSON: []byte(`{ "query": "select 1", "resultFormat": "table" }`),
|
||||
Interval: 1 * time.Minute,
|
||||
MaxDataPoints: 423,
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Now().AddDate(0, 0, -1),
|
||||
To: time.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return getHealthCheckMessage(logger, "error performing sql query", err)
|
||||
}
|
||||
|
||||
res := ds.Responses[refID]
|
||||
if res.Error != nil {
|
||||
return &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusError,
|
||||
Message: fmt.Sprintf("ERROR: %s", res.Error),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusOk,
|
||||
Message: "OK",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getHealthCheckMessage(logger log.Logger, message string, err error) (*backend.CheckHealthResult, error) {
|
||||
if err == nil {
|
||||
return &backend.CheckHealthResult{
|
||||
|
@ -14,10 +14,12 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/fsql"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
|
||||
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
|
||||
)
|
||||
|
||||
@ -57,22 +59,27 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading settings: %w", err)
|
||||
}
|
||||
|
||||
httpMode := jsonData.HTTPMode
|
||||
if httpMode == "" {
|
||||
httpMode = "GET"
|
||||
}
|
||||
|
||||
maxSeries := jsonData.MaxSeries
|
||||
if maxSeries == 0 {
|
||||
maxSeries = 1000
|
||||
}
|
||||
|
||||
version := jsonData.Version
|
||||
if version == "" {
|
||||
version = influxVersionInfluxQL
|
||||
}
|
||||
|
||||
database := jsonData.DbName
|
||||
if database == "" {
|
||||
database = settings.Database
|
||||
}
|
||||
|
||||
model := &models.DatasourceInfo{
|
||||
HTTPClient: client,
|
||||
URL: settings.URL,
|
||||
@ -82,7 +89,9 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
||||
TimeInterval: jsonData.TimeInterval,
|
||||
DefaultBucket: jsonData.DefaultBucket,
|
||||
Organization: jsonData.Organization,
|
||||
Metadata: jsonData.Metadata,
|
||||
MaxSeries: maxSeries,
|
||||
SecureGrpc: true,
|
||||
Token: settings.DecryptedSecureJSONData["token"],
|
||||
}
|
||||
return model, nil
|
||||
@ -98,9 +107,12 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
return nil, err
|
||||
}
|
||||
version := dsInfo.Version
|
||||
if version == "Flux" {
|
||||
if version == influxVersionFlux {
|
||||
return flux.Query(ctx, dsInfo, *req)
|
||||
}
|
||||
if version == influxVersionSQL {
|
||||
return fsql.Query(ctx, dsInfo, *req)
|
||||
}
|
||||
|
||||
logger.Debug("Making a non-Flux type query")
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
type DatasourceInfo struct {
|
||||
HTTPClient *http.Client
|
||||
|
||||
Token string
|
||||
URL string
|
||||
|
||||
@ -16,4 +17,9 @@ type DatasourceInfo struct {
|
||||
DefaultBucket string `json:"defaultBucket"`
|
||||
Organization string `json:"organization"`
|
||||
MaxSeries int `json:"maxSeries"`
|
||||
|
||||
// Flight SQL metadata
|
||||
Metadata []map[string]string `json:"metadata"`
|
||||
// FlightSQL grpc connection
|
||||
SecureGrpc bool `json:"secureGrpc"`
|
||||
}
|
||||
|
@ -3,4 +3,5 @@ package influxdb
|
||||
const (
|
||||
influxVersionFlux = "Flux"
|
||||
influxVersionInfluxQL = "InfluxQL"
|
||||
influxVersionSQL = "SQL"
|
||||
)
|
||||
|
@ -4,34 +4,21 @@ import React, { PureComponent } from 'react';
|
||||
import {
|
||||
DataSourcePluginOptionsEditorProps,
|
||||
DataSourceSettings,
|
||||
onUpdateDatasourceJsonDataOption,
|
||||
onUpdateDatasourceJsonDataOptionSelect,
|
||||
onUpdateDatasourceOption,
|
||||
onUpdateDatasourceSecureJsonDataOption,
|
||||
SelectableValue,
|
||||
updateDatasourcePluginJsonDataOption,
|
||||
updateDatasourcePluginResetOption,
|
||||
} from '@grafana/data/src';
|
||||
import {
|
||||
Alert,
|
||||
DataSourceHttpSettings,
|
||||
InfoBox,
|
||||
InlineField,
|
||||
InlineFormLabel,
|
||||
LegacyForms,
|
||||
Select,
|
||||
} from '@grafana/ui/src';
|
||||
import { Alert, DataSourceHttpSettings, InlineField, LegacyForms, Select } from '@grafana/ui/src';
|
||||
import { config } from 'app/core/config';
|
||||
|
||||
import { BROWSER_MODE_DISABLED_MESSAGE } from '../../../constants';
|
||||
import { InfluxOptions, InfluxOptionsV1, InfluxSecureJsonData, InfluxVersion } from '../../../types';
|
||||
import { InfluxOptions, InfluxOptionsV1, InfluxVersion } from '../../../types';
|
||||
|
||||
const { Input, SecretFormField } = LegacyForms;
|
||||
import { InfluxFluxConfig } from './InfluxFluxConfig';
|
||||
import { InfluxInfluxQLConfig } from './InfluxInfluxQLConfig';
|
||||
import { InfluxSqlConfig } from './InfluxSQLConfig';
|
||||
|
||||
const httpModes: SelectableValue[] = [
|
||||
{ label: 'GET', value: 'GET' },
|
||||
{ label: 'POST', value: 'POST' },
|
||||
];
|
||||
const { Input } = LegacyForms;
|
||||
|
||||
const versions: Array<SelectableValue<InfluxVersion>> = [
|
||||
{
|
||||
@ -46,6 +33,15 @@ const versions: Array<SelectableValue<InfluxVersion>> = [
|
||||
},
|
||||
];
|
||||
|
||||
const versionsWithSQL: Array<SelectableValue<InfluxVersion>> = [
|
||||
...versions,
|
||||
{
|
||||
label: 'SQL',
|
||||
value: InfluxVersion.SQL,
|
||||
description: 'Native SQL language. Supported in InfluxDB 3.0',
|
||||
},
|
||||
];
|
||||
|
||||
export type Props = DataSourcePluginOptionsEditorProps<InfluxOptions>;
|
||||
type State = {
|
||||
maxSeries: string | undefined;
|
||||
@ -64,6 +60,11 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
this.htmlPrefix = uniqueId('influxdb-config');
|
||||
}
|
||||
|
||||
versionNotice = {
|
||||
Flux: 'Support for Flux in Grafana is currently in beta',
|
||||
SQL: 'Support for SQL in Grafana is currently in alpha',
|
||||
};
|
||||
|
||||
// 1x
|
||||
onResetPassword = () => {
|
||||
updateDatasourcePluginResetOption(this.props, 'password');
|
||||
@ -98,196 +99,30 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
}
|
||||
};
|
||||
|
||||
renderInflux2x() {
|
||||
const { options } = this.props;
|
||||
const { secureJsonFields } = options;
|
||||
const secureJsonData = (options.secureJsonData || {}) as InfluxSecureJsonData;
|
||||
const { htmlPrefix } = this;
|
||||
|
||||
return (
|
||||
<>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-org`} className="width-10">
|
||||
Organization
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
id={`${htmlPrefix}-org`}
|
||||
className="width-20"
|
||||
value={options.jsonData.organization || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(this.props, 'organization')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<SecretFormField
|
||||
isConfigured={Boolean(secureJsonFields && secureJsonFields.token)}
|
||||
value={secureJsonData.token || ''}
|
||||
label="Token"
|
||||
aria-label="Token"
|
||||
labelWidth={10}
|
||||
inputWidth={20}
|
||||
onReset={this.onResetToken}
|
||||
onChange={onUpdateDatasourceSecureJsonDataOption(this.props, 'token')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel className="width-10">Default Bucket</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-20"
|
||||
placeholder="default bucket"
|
||||
value={options.jsonData.defaultBucket || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(this.props, 'defaultBucket')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
className="width-10"
|
||||
tooltip="A lower limit for the auto group by time interval. Recommended to be set to write frequency,
|
||||
for example 1m if your data is written every minute."
|
||||
>
|
||||
Min time interval
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-10"
|
||||
placeholder="10s"
|
||||
value={options.jsonData.timeInterval || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(this.props, 'timeInterval')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
getQueryLanguageDropdownValue = (v?: InfluxVersion) => {
|
||||
switch (v) {
|
||||
case InfluxVersion.InfluxQL:
|
||||
return versionsWithSQL[0];
|
||||
case InfluxVersion.Flux:
|
||||
return versionsWithSQL[1];
|
||||
case InfluxVersion.SQL:
|
||||
return versionsWithSQL[2];
|
||||
default:
|
||||
return versionsWithSQL[0];
|
||||
}
|
||||
};
|
||||
|
||||
renderInflux1x() {
|
||||
const { options } = this.props;
|
||||
const { secureJsonFields } = options;
|
||||
const secureJsonData = (options.secureJsonData || {}) as InfluxSecureJsonData;
|
||||
const { htmlPrefix } = this;
|
||||
|
||||
return (
|
||||
<>
|
||||
<InfoBox>
|
||||
<h5>Database Access</h5>
|
||||
<p>
|
||||
Setting the database for this datasource does not deny access to other databases. The InfluxDB query syntax
|
||||
allows switching the database in the query. For example:
|
||||
<code>SHOW MEASUREMENTS ON _internal</code> or
|
||||
<code>SELECT * FROM "_internal".."database" LIMIT 10</code>
|
||||
<br />
|
||||
<br />
|
||||
To support data isolation and security, make sure appropriate permissions are configured in InfluxDB.
|
||||
</p>
|
||||
</InfoBox>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-db`} className="width-10">
|
||||
Database
|
||||
</InlineFormLabel>
|
||||
<div className="width-20">
|
||||
<Input
|
||||
id={`${htmlPrefix}-db`}
|
||||
className="width-20"
|
||||
value={options.jsonData.dbName ?? options.database}
|
||||
onChange={(event) => {
|
||||
this.props.onOptionsChange({
|
||||
...options,
|
||||
database: '',
|
||||
jsonData: {
|
||||
...options.jsonData,
|
||||
dbName: event.target.value,
|
||||
},
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-user`} className="width-10">
|
||||
User
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
id={`${htmlPrefix}-user`}
|
||||
className="width-20"
|
||||
value={options.user || ''}
|
||||
onChange={onUpdateDatasourceOption(this.props, 'user')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<SecretFormField
|
||||
isConfigured={Boolean(secureJsonFields && secureJsonFields.password)}
|
||||
value={secureJsonData.password || ''}
|
||||
label="Password"
|
||||
aria-label="Password"
|
||||
labelWidth={10}
|
||||
inputWidth={20}
|
||||
onReset={this.onResetPassword}
|
||||
onChange={onUpdateDatasourceSecureJsonDataOption(this.props, 'password')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
htmlFor={`${htmlPrefix}-http-method`}
|
||||
className="width-10"
|
||||
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
|
||||
method allows you to perform heavy requests (with a lots of WHERE clause) while the GET method
|
||||
will restrict you and return an error if the query is too large."
|
||||
>
|
||||
HTTP Method
|
||||
</InlineFormLabel>
|
||||
<Select
|
||||
inputId={`${htmlPrefix}-http-method`}
|
||||
className="width-10"
|
||||
value={httpModes.find((httpMode) => httpMode.value === options.jsonData.httpMode)}
|
||||
options={httpModes}
|
||||
defaultValue={options.jsonData.httpMode}
|
||||
onChange={onUpdateDatasourceJsonDataOptionSelect(this.props, 'httpMode')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
className="width-10"
|
||||
tooltip="A lower limit for the auto group by time interval. Recommended to be set to write frequency,
|
||||
for example 1m if your data is written every minute."
|
||||
>
|
||||
Min time interval
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-10"
|
||||
placeholder="10s"
|
||||
value={options.jsonData.timeInterval || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(this.props, 'timeInterval')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
renderJsonDataOptions() {
|
||||
switch (this.props.options.jsonData.version) {
|
||||
case InfluxVersion.InfluxQL:
|
||||
return <InfluxInfluxQLConfig {...this.props} />;
|
||||
case InfluxVersion.Flux:
|
||||
return <InfluxFluxConfig {...this.props} />;
|
||||
case InfluxVersion.SQL:
|
||||
return <InfluxSqlConfig {...this.props} />;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
render() {
|
||||
@ -303,8 +138,8 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
<Select
|
||||
aria-label="Query language"
|
||||
className="width-30"
|
||||
value={options.jsonData.version === InfluxVersion.Flux ? versions[1] : versions[0]}
|
||||
options={versions}
|
||||
value={this.getQueryLanguageDropdownValue(options.jsonData.version)}
|
||||
options={config.featureToggles.influxdbSqlSupport ? versionsWithSQL : versions}
|
||||
defaultValue={versions[0]}
|
||||
onChange={this.onVersionChanged}
|
||||
/>
|
||||
@ -312,16 +147,15 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{options.jsonData.version === InfluxVersion.Flux && (
|
||||
<InfoBox>
|
||||
<h5>Support for Flux in Grafana is currently in beta</h5>
|
||||
{options.jsonData.version !== InfluxVersion.InfluxQL && (
|
||||
<Alert severity="info" title={this.versionNotice[options.jsonData.version!]}>
|
||||
<p>
|
||||
Please report any issues to: <br />
|
||||
<a href="https://github.com/grafana/grafana/issues/new/choose">
|
||||
https://github.com/grafana/grafana/issues
|
||||
</a>
|
||||
</p>
|
||||
</InfoBox>
|
||||
</Alert>
|
||||
)}
|
||||
|
||||
{isDirectAccess && (
|
||||
@ -342,7 +176,7 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
<div>
|
||||
<h3 className="page-heading">InfluxDB Details</h3>
|
||||
</div>
|
||||
{options.jsonData.version === InfluxVersion.Flux ? this.renderInflux2x() : this.renderInflux1x()}
|
||||
{this.renderJsonDataOptions()}
|
||||
<div className="gf-form-inline">
|
||||
<InlineField
|
||||
labelWidth={20}
|
||||
@ -352,7 +186,7 @@ export class ConfigEditor extends PureComponent<Props, State> {
|
||||
<Input
|
||||
placeholder="1000"
|
||||
type="number"
|
||||
className="width-10"
|
||||
className="width-20"
|
||||
value={this.state.maxSeries}
|
||||
onChange={(event) => {
|
||||
// We duplicate this state so that we allow to write freely inside the input. We don't have
|
||||
|
@ -0,0 +1,90 @@
|
||||
import { uniqueId } from 'lodash';
|
||||
import React from 'react';
|
||||
|
||||
import {
|
||||
DataSourcePluginOptionsEditorProps,
|
||||
onUpdateDatasourceJsonDataOption,
|
||||
onUpdateDatasourceSecureJsonDataOption,
|
||||
updateDatasourcePluginResetOption,
|
||||
} from '@grafana/data';
|
||||
import { InlineFormLabel, LegacyForms } from '@grafana/ui';
|
||||
|
||||
import { InfluxOptions, InfluxSecureJsonData } from '../../../types';
|
||||
|
||||
const { Input, SecretFormField } = LegacyForms;
|
||||
|
||||
export type Props = DataSourcePluginOptionsEditorProps<InfluxOptions, InfluxSecureJsonData>;
|
||||
|
||||
export const InfluxFluxConfig = (props: Props) => {
|
||||
const {
|
||||
options: { jsonData, secureJsonData, secureJsonFields },
|
||||
} = props;
|
||||
const htmlPrefix = uniqueId('influxdb-flux-config');
|
||||
|
||||
return (
|
||||
<>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-org`} className="width-10">
|
||||
Organization
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
id={`${htmlPrefix}-org`}
|
||||
className="width-20"
|
||||
value={jsonData.organization || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(props, 'organization')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<SecretFormField
|
||||
isConfigured={Boolean(secureJsonFields && secureJsonFields.token)}
|
||||
value={secureJsonData?.token || ''}
|
||||
label="Token"
|
||||
aria-label="Token"
|
||||
labelWidth={10}
|
||||
inputWidth={20}
|
||||
onReset={() => updateDatasourcePluginResetOption(props, 'token')}
|
||||
onChange={onUpdateDatasourceSecureJsonDataOption(props, 'token')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel className="width-10">Default Bucket</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-20"
|
||||
placeholder="default bucket"
|
||||
value={jsonData.defaultBucket || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(props, 'defaultBucket')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
className="width-10"
|
||||
tooltip="A lower limit for the auto group by time interval. Recommended to be set to write frequency,
|
||||
for example 1m if your data is written every minute."
|
||||
>
|
||||
Min time interval
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-20"
|
||||
placeholder="10s"
|
||||
value={jsonData.timeInterval || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(props, 'timeInterval')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
};
|
@ -0,0 +1,140 @@
|
||||
import { uniqueId } from 'lodash';
|
||||
import React from 'react';
|
||||
|
||||
import {
|
||||
DataSourcePluginOptionsEditorProps,
|
||||
onUpdateDatasourceJsonDataOption,
|
||||
onUpdateDatasourceJsonDataOptionSelect,
|
||||
onUpdateDatasourceOption,
|
||||
onUpdateDatasourceSecureJsonDataOption,
|
||||
SelectableValue,
|
||||
updateDatasourcePluginResetOption,
|
||||
} from '@grafana/data';
|
||||
import { Alert, InlineFormLabel, LegacyForms, Select } from '@grafana/ui';
|
||||
|
||||
import { InfluxOptions, InfluxSecureJsonData } from '../../../types';
|
||||
|
||||
const { Input, SecretFormField } = LegacyForms;
|
||||
|
||||
const httpModes: SelectableValue[] = [
|
||||
{ label: 'GET', value: 'GET' },
|
||||
{ label: 'POST', value: 'POST' },
|
||||
];
|
||||
|
||||
export type Props = DataSourcePluginOptionsEditorProps<InfluxOptions, InfluxSecureJsonData>;
|
||||
|
||||
export const InfluxInfluxQLConfig = (props: Props) => {
|
||||
const { options, onOptionsChange } = props;
|
||||
const { database, jsonData, secureJsonData, secureJsonFields } = options;
|
||||
const htmlPrefix = uniqueId('influxdb-influxql-config');
|
||||
|
||||
return (
|
||||
<>
|
||||
<Alert severity="info" title="Database Access">
|
||||
<p>
|
||||
Setting the database for this datasource does not deny access to other databases. The InfluxDB query syntax
|
||||
allows switching the database in the query. For example:
|
||||
<code>SHOW MEASUREMENTS ON _internal</code> or
|
||||
<code>SELECT * FROM "_internal".."database" LIMIT 10</code>
|
||||
<br />
|
||||
<br />
|
||||
To support data isolation and security, make sure appropriate permissions are configured in InfluxDB.
|
||||
</p>
|
||||
</Alert>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-db`} className="width-10">
|
||||
Database
|
||||
</InlineFormLabel>
|
||||
<div className="width-20">
|
||||
<Input
|
||||
id={`${htmlPrefix}-db`}
|
||||
className="width-20"
|
||||
value={jsonData.dbName ?? database}
|
||||
onChange={(event) => {
|
||||
onOptionsChange({
|
||||
...options,
|
||||
database: '',
|
||||
jsonData: {
|
||||
...jsonData,
|
||||
dbName: event.target.value,
|
||||
},
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel htmlFor={`${htmlPrefix}-user`} className="width-10">
|
||||
User
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
id={`${htmlPrefix}-user`}
|
||||
className="width-20"
|
||||
value={options.user || ''}
|
||||
onChange={onUpdateDatasourceOption(props, 'user')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<SecretFormField
|
||||
isConfigured={Boolean(secureJsonFields && secureJsonFields.password)}
|
||||
value={secureJsonData?.password || ''}
|
||||
label="Password"
|
||||
aria-label="Password"
|
||||
labelWidth={10}
|
||||
inputWidth={20}
|
||||
onReset={() => updateDatasourcePluginResetOption(props, 'password')}
|
||||
onChange={onUpdateDatasourceSecureJsonDataOption(props, 'password')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
htmlFor={`${htmlPrefix}-http-method`}
|
||||
className="width-10"
|
||||
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
|
||||
method allows you to perform heavy requests (with a lots of WHERE clause) while the GET method
|
||||
will restrict you and return an error if the query is too large."
|
||||
>
|
||||
HTTP Method
|
||||
</InlineFormLabel>
|
||||
<Select
|
||||
inputId={`${htmlPrefix}-http-method`}
|
||||
className="width-20"
|
||||
value={httpModes.find((httpMode) => httpMode.value === options.jsonData.httpMode)}
|
||||
options={httpModes}
|
||||
defaultValue={options.jsonData.httpMode}
|
||||
onChange={onUpdateDatasourceJsonDataOptionSelect(props, 'httpMode')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="gf-form-inline">
|
||||
<div className="gf-form">
|
||||
<InlineFormLabel
|
||||
className="width-10"
|
||||
tooltip="A lower limit for the auto group by time interval. Recommended to be set to write frequency,
|
||||
for example 1m if your data is written every minute."
|
||||
>
|
||||
Min time interval
|
||||
</InlineFormLabel>
|
||||
<div className="width-10">
|
||||
<Input
|
||||
className="width-20"
|
||||
placeholder="10s"
|
||||
value={options.jsonData.timeInterval || ''}
|
||||
onChange={onUpdateDatasourceJsonDataOption(props, 'timeInterval')}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
};
|
@ -0,0 +1,138 @@
|
||||
import React, { useEffect, useState } from 'react';
|
||||
|
||||
import {
|
||||
DataSourcePluginOptionsEditorProps,
|
||||
onUpdateDatasourceSecureJsonDataOption,
|
||||
updateDatasourcePluginResetOption,
|
||||
} from '@grafana/data';
|
||||
import { InlineField, SecretInput, Input, InlineFieldRow, InlineLabel } from '@grafana/ui';
|
||||
|
||||
import { InfluxOptions, InfluxSecureJsonData } from '../../../types';
|
||||
|
||||
export type Props = DataSourcePluginOptionsEditorProps<InfluxOptions, InfluxSecureJsonData>;
|
||||
|
||||
type MetadataState = Array<{ key: string; value: string }>;
|
||||
|
||||
export const addMetaData = (setMetaData: (val: MetadataState) => void, metaDataArr: MetadataState) => {
|
||||
setMetaData([...metaDataArr, { key: '', value: '' }]);
|
||||
};
|
||||
|
||||
export const removeMetaData = (i: number, setMetaData: (val: MetadataState) => void, metaDataArr: MetadataState) => {
|
||||
const newMetaValues = [...metaDataArr];
|
||||
newMetaValues.splice(i, 1);
|
||||
setMetaData(newMetaValues);
|
||||
};
|
||||
|
||||
export const onKeyChange = (
|
||||
key: string,
|
||||
metaDataArr: MetadataState,
|
||||
index: number,
|
||||
setMetaData: (val: MetadataState) => void
|
||||
) => {
|
||||
const newMetaValues = [...metaDataArr];
|
||||
newMetaValues[index]['key'] = key;
|
||||
setMetaData(newMetaValues);
|
||||
};
|
||||
|
||||
export const onValueChange = (
|
||||
value: string,
|
||||
metaDataArr: MetadataState,
|
||||
index: number,
|
||||
setMetaData: (val: MetadataState) => void
|
||||
) => {
|
||||
const newMetaValues = [...metaDataArr];
|
||||
newMetaValues[index]['value'] = value;
|
||||
setMetaData(newMetaValues);
|
||||
};
|
||||
|
||||
export const InfluxSqlConfig = (props: Props) => {
|
||||
const {
|
||||
options: { jsonData, secureJsonData, secureJsonFields },
|
||||
} = props;
|
||||
|
||||
const existingMetadata: MetadataState = jsonData?.metadata?.length
|
||||
? jsonData?.metadata?.map((md) => ({ key: Object.keys(md)[0], value: Object.values(md)[0] }))
|
||||
: [{ key: 'bucket-name', value: '' }];
|
||||
const [metaDataArr, setMetaData] = useState<MetadataState>(existingMetadata);
|
||||
|
||||
useEffect(() => {
|
||||
const { onOptionsChange, options } = props;
|
||||
const mapData = metaDataArr?.map((m) => ({ [m.key]: m.value }));
|
||||
const jsonData = {
|
||||
...options.jsonData,
|
||||
metadata: mapData,
|
||||
};
|
||||
onOptionsChange({
|
||||
...options,
|
||||
jsonData,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [metaDataArr]);
|
||||
|
||||
return (
|
||||
<div>
|
||||
<div className="gf-form">
|
||||
<h6>Token</h6>
|
||||
</div>
|
||||
<div>
|
||||
<InlineField labelWidth={20} label="Token">
|
||||
<SecretInput
|
||||
width={40}
|
||||
name="token"
|
||||
type="text"
|
||||
value={secureJsonData?.token || ''}
|
||||
onReset={() => updateDatasourcePluginResetOption(props, 'token')}
|
||||
onChange={onUpdateDatasourceSecureJsonDataOption(props, 'token')}
|
||||
isConfigured={secureJsonFields?.token}
|
||||
/>
|
||||
</InlineField>
|
||||
</div>
|
||||
<div>
|
||||
<div className="gf-form">
|
||||
<h6>MetaData</h6>
|
||||
</div>
|
||||
{metaDataArr?.map((_, i) => (
|
||||
<InlineFieldRow key={i} style={{ flexFlow: 'row' }}>
|
||||
<InlineField labelWidth={20} label="Key">
|
||||
<Input
|
||||
key={i}
|
||||
width={40}
|
||||
name="key"
|
||||
type="text"
|
||||
value={metaDataArr[i]?.key || ''}
|
||||
placeholder="key"
|
||||
onChange={(e) => onKeyChange(e.currentTarget.value, metaDataArr, i, setMetaData)}
|
||||
></Input>
|
||||
</InlineField>
|
||||
<InlineField labelWidth={20} label="Value">
|
||||
<Input
|
||||
key={i}
|
||||
width={40}
|
||||
name="value"
|
||||
type="text"
|
||||
value={metaDataArr[i]?.value?.toString() ?? ''}
|
||||
placeholder="value"
|
||||
onChange={(e) => onValueChange(e.currentTarget.value, metaDataArr, i, setMetaData)}
|
||||
></Input>
|
||||
</InlineField>
|
||||
{i + 1 >= metaDataArr.length && (
|
||||
<InlineLabel as="button" className="" onClick={() => addMetaData(setMetaData, metaDataArr)} width="auto">
|
||||
+
|
||||
</InlineLabel>
|
||||
)}
|
||||
{i > 0 && (
|
||||
<InlineLabel
|
||||
as="button"
|
||||
className=""
|
||||
width="auto"
|
||||
onClick={() => removeMetaData(i, setMetaData, metaDataArr)}
|
||||
>
|
||||
-
|
||||
</InlineLabel>
|
||||
)}
|
||||
</InlineFieldRow>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
@ -5,9 +5,10 @@ import { QueryEditorProps } from '@grafana/data/src';
|
||||
|
||||
import InfluxDatasource from '../../../datasource';
|
||||
import { buildRawQuery } from '../../../queryUtils';
|
||||
import { InfluxOptions, InfluxQuery } from '../../../types';
|
||||
import { InfluxOptions, InfluxQuery, InfluxVersion } from '../../../types';
|
||||
|
||||
import { FluxQueryEditor } from './flux/FluxQueryEditor';
|
||||
import { FSQLEditor } from './fsql/FSQLEditor';
|
||||
import { QueryEditorModeSwitcher } from './influxql/QueryEditorModeSwitcher';
|
||||
import { RawInfluxQLEditor } from './influxql/code/RawInfluxQLEditor';
|
||||
import { VisualInfluxQLEditor as VisualInfluxQLEditor } from './influxql/visual/VisualInfluxQLEditor';
|
||||
@ -15,14 +16,17 @@ import { VisualInfluxQLEditor as VisualInfluxQLEditor } from './influxql/visual/
|
||||
type Props = QueryEditorProps<InfluxDatasource, InfluxQuery, InfluxOptions>;
|
||||
|
||||
export const QueryEditor = ({ query, onChange, onRunQuery, datasource }: Props) => {
|
||||
if (datasource.isFlux) {
|
||||
switch (datasource.version) {
|
||||
case InfluxVersion.Flux:
|
||||
return (
|
||||
<div className="gf-form-query-content">
|
||||
<FluxQueryEditor query={query} onChange={onChange} onRunQuery={onRunQuery} datasource={datasource} />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
case InfluxVersion.SQL:
|
||||
return <FSQLEditor query={query} onChange={onChange} onRunQuery={onRunQuery} />;
|
||||
case InfluxVersion.InfluxQL:
|
||||
default:
|
||||
return (
|
||||
<div className={css({ display: 'flex' })}>
|
||||
<div className={css({ flexGrow: 1 })}>
|
||||
@ -41,4 +45,5 @@ export const QueryEditor = ({ query, onChange, onRunQuery, datasource }: Props)
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
};
|
||||
|
@ -0,0 +1,32 @@
|
||||
import React from 'react';
|
||||
|
||||
import { Input } from '@grafana/ui';
|
||||
|
||||
import { InfluxQuery } from '../../../../types';
|
||||
|
||||
type Props = {
|
||||
onChange: (query: InfluxQuery) => void;
|
||||
onRunQuery: () => void;
|
||||
query: InfluxQuery;
|
||||
};
|
||||
|
||||
// Flight SQL Editor
|
||||
export const FSQLEditor = (props: Props) => {
|
||||
const onSQLQueryChange = (query?: string) => {
|
||||
if (query) {
|
||||
props.onChange({ ...props.query, query, resultFormat: 'table' });
|
||||
}
|
||||
props.onRunQuery();
|
||||
};
|
||||
return (
|
||||
<div>
|
||||
<Input
|
||||
value={props.query.query}
|
||||
onBlur={(e) => onSQLQueryChange(e.currentTarget.value)}
|
||||
onChange={(e) => onSQLQueryChange(e.currentTarget.value)}
|
||||
/>
|
||||
<br />
|
||||
<button onClick={() => onSQLQueryChange()}>run query</button>
|
||||
</div>
|
||||
);
|
||||
};
|
@ -3,6 +3,7 @@ import React, { PureComponent } from 'react';
|
||||
import { InlineFormLabel, TextArea } from '@grafana/ui/src';
|
||||
|
||||
import InfluxDatasource from '../../../datasource';
|
||||
import { InfluxVersion } from '../../../types';
|
||||
import { FluxQueryEditor } from '../query/flux/FluxQueryEditor';
|
||||
|
||||
interface Props {
|
||||
@ -18,7 +19,7 @@ export default class VariableQueryEditor extends PureComponent<Props> {
|
||||
|
||||
render() {
|
||||
let { query, datasource, onChange } = this.props;
|
||||
if (datasource.isFlux) {
|
||||
if (datasource.version === InfluxVersion.Flux) {
|
||||
return (
|
||||
<FluxQueryEditor
|
||||
datasource={datasource}
|
||||
|
@ -55,7 +55,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
access: 'direct' | 'proxy';
|
||||
responseParser: ResponseParser;
|
||||
httpMode: string;
|
||||
isFlux: boolean;
|
||||
version?: InfluxVersion;
|
||||
isProxyAccess: boolean;
|
||||
retentionPolicies: string[];
|
||||
|
||||
@ -81,11 +81,11 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
this.interval = settingsData.timeInterval;
|
||||
this.httpMode = settingsData.httpMode || 'GET';
|
||||
this.responseParser = new ResponseParser();
|
||||
this.isFlux = settingsData.version === InfluxVersion.Flux;
|
||||
this.version = settingsData.version ?? InfluxVersion.InfluxQL;
|
||||
this.isProxyAccess = instanceSettings.access === 'proxy';
|
||||
this.retentionPolicies = [];
|
||||
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
// When flux, use an annotation processor rather than the `annotationQuery` lifecycle
|
||||
this.annotations = {
|
||||
QueryEditor: FluxQueryEditor,
|
||||
@ -100,9 +100,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
|
||||
async getRetentionPolicies(): Promise<string[]> {
|
||||
// Only For InfluxQL Mode
|
||||
if (this.isFlux || this.retentionPolicies.length) {
|
||||
return Promise.resolve(this.retentionPolicies);
|
||||
} else {
|
||||
if (this.version === InfluxVersion.InfluxQL && !this.retentionPolicies.length) {
|
||||
return getAllPolicies(this).catch((err) => {
|
||||
console.error(
|
||||
'Unable to fetch retention policies. Queries will be run without specifying retention policy.',
|
||||
@ -111,6 +109,8 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
return Promise.resolve(this.retentionPolicies);
|
||||
});
|
||||
}
|
||||
|
||||
return Promise.resolve(this.retentionPolicies);
|
||||
}
|
||||
|
||||
query(request: DataQueryRequest<InfluxQuery>): Observable<DataQueryResponse> {
|
||||
@ -171,7 +171,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
return merge(...streams);
|
||||
}
|
||||
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux || this.version === InfluxVersion.SQL) {
|
||||
return super.query(filteredRequest);
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
}
|
||||
|
||||
getQueryDisplayText(query: InfluxQuery) {
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
return query.query;
|
||||
}
|
||||
return new InfluxQueryModel(query).render(false);
|
||||
@ -231,7 +231,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
* Returns false if the query should be skipped
|
||||
*/
|
||||
filterQuery(query: InfluxQuery): boolean {
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
return !!query.query;
|
||||
}
|
||||
return true;
|
||||
@ -241,7 +241,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
// We want to interpolate these variables on backend
|
||||
const { __interval, __interval_ms, ...rest } = scopedVars || {};
|
||||
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
return {
|
||||
...query,
|
||||
query: this.templateSrv.replace(query.query ?? '', rest), // The raw query text
|
||||
@ -258,7 +258,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
targetContainsTemplate(target: InfluxQuery) {
|
||||
// for flux-mode we just take target.query,
|
||||
// for influxql-mode we use InfluxQueryModel to create the text-representation
|
||||
const queryText = this.isFlux ? target.query : buildRawQuery(target);
|
||||
const queryText = this.version === InfluxVersion.Flux ? target.query : buildRawQuery(target);
|
||||
|
||||
return this.templateSrv.containsTemplate(queryText);
|
||||
}
|
||||
@ -269,7 +269,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
}
|
||||
|
||||
return queries.map((query) => {
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
return {
|
||||
...query,
|
||||
datasource: this.getRef(),
|
||||
@ -347,7 +347,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
}
|
||||
|
||||
async metricFindQuery(query: string, options?: any): Promise<MetricFindValue[]> {
|
||||
if (this.isFlux || this.isMigrationToggleOnAndIsAccessProxy()) {
|
||||
if (this.version === InfluxVersion.Flux || this.isMigrationToggleOnAndIsAccessProxy()) {
|
||||
const target: InfluxQuery = {
|
||||
refId: 'metricFindQuery',
|
||||
query,
|
||||
@ -696,7 +696,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
|
||||
}
|
||||
|
||||
async annotationEvents(options: DataQueryRequest, annotation: InfluxQuery): Promise<AnnotationEvent[]> {
|
||||
if (this.isFlux) {
|
||||
if (this.version === InfluxVersion.Flux) {
|
||||
return Promise.reject({
|
||||
message: 'Flux requires the standard annotation query',
|
||||
});
|
||||
|
@ -8,6 +8,7 @@ import { backendSrv } from 'app/core/services/backend_srv'; // will use the vers
|
||||
|
||||
import { BROWSER_MODE_DISABLED_MESSAGE } from '../constants';
|
||||
import InfluxDatasource from '../datasource';
|
||||
import { InfluxVersion } from '../types';
|
||||
|
||||
//@ts-ignore
|
||||
const templateSrv = new TemplateSrvStub();
|
||||
@ -295,7 +296,7 @@ describe('InfluxDataSource', () => {
|
||||
|
||||
describe('when interpolating query variables for dashboard->explore', () => {
|
||||
it('should interpolate all variables with Flux mode', () => {
|
||||
ds.isFlux = true;
|
||||
ds.version = InfluxVersion.Flux;
|
||||
const fluxQuery = {
|
||||
refId: 'x',
|
||||
query: '$interpolationVar,$interpolationVar2',
|
||||
@ -309,7 +310,7 @@ describe('InfluxDataSource', () => {
|
||||
});
|
||||
|
||||
it('should interpolate all variables with InfluxQL mode', () => {
|
||||
ds.isFlux = false;
|
||||
ds.version = InfluxVersion.InfluxQL;
|
||||
const queries = ds.interpolateVariablesInQueries([influxQuery], {
|
||||
interpolationVar: { text: text, value: text },
|
||||
interpolationVar2: { text: text2, value: text2 },
|
||||
@ -320,7 +321,7 @@ describe('InfluxDataSource', () => {
|
||||
|
||||
describe('when interpolating template variables', () => {
|
||||
it('should apply all template variables with Flux mode', () => {
|
||||
ds.isFlux = true;
|
||||
ds.version = InfluxVersion.Flux;
|
||||
const fluxQuery = {
|
||||
refId: 'x',
|
||||
query: '$interpolationVar',
|
||||
@ -336,7 +337,7 @@ describe('InfluxDataSource', () => {
|
||||
});
|
||||
|
||||
it('should apply all template variables with InfluxQL mode', () => {
|
||||
ds.isFlux = false;
|
||||
ds.version = ds.version = InfluxVersion.InfluxQL;
|
||||
ds.access = 'proxy';
|
||||
config.featureToggles.influxdbBackendMigration = true;
|
||||
const query = ds.applyTemplateVariables(influxQuery, {
|
||||
@ -347,7 +348,7 @@ describe('InfluxDataSource', () => {
|
||||
});
|
||||
|
||||
it('should apply all scopedVars to tags', () => {
|
||||
ds.isFlux = false;
|
||||
ds.version = InfluxVersion.InfluxQL;
|
||||
ds.access = 'proxy';
|
||||
config.featureToggles.influxdbBackendMigration = true;
|
||||
const query = ds.applyTemplateVariables(influxQuery, {
|
||||
|
@ -3,6 +3,7 @@ import { AdHocVariableFilter, DataQuery, DataSourceJsonData } from '@grafana/dat
|
||||
export enum InfluxVersion {
|
||||
InfluxQL = 'InfluxQL',
|
||||
Flux = 'Flux',
|
||||
SQL = 'SQL',
|
||||
}
|
||||
|
||||
export interface InfluxOptions extends DataSourceJsonData {
|
||||
@ -17,6 +18,9 @@ export interface InfluxOptions extends DataSourceJsonData {
|
||||
organization?: string;
|
||||
defaultBucket?: string;
|
||||
maxSeries?: number;
|
||||
|
||||
// With SQL
|
||||
metadata?: Array<Record<string, string>>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user