InfluxDB: Update SQL language url specification (#79408)

* accept urls with or without port

* remove redundant comment

* remove redundant function
This commit is contained in:
ismail simsek 2023-12-13 16:01:01 +01:00 committed by GitHub
parent 46192d676d
commit 03da1a3f3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 56 deletions

View File

@ -11,76 +11,94 @@ import (
"github.com/apache/arrow/go/v13/arrow/flight/flightsql/example" "github.com/apache/arrow/go/v13/arrow/flight/flightsql/example"
"github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/arrow/memory"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models" "github.com/grafana/grafana/pkg/tsdb/influxdb/models"
) )
func TestIntegration_QueryData(t *testing.T) { type FSQLTestSuite struct {
suite.Suite
db *sql.DB
server flight.Server
}
func (suite *FSQLTestSuite) SetupTest() {
db, err := example.CreateDB() db, err := example.CreateDB()
require.NoError(t, err) require.NoError(suite.T(), err)
defer func(db *sql.DB) {
err := db.Close()
assert.NoError(t, err)
}(db)
sqliteServer, err := example.NewSQLiteFlightSQLServer(db) sqliteServer, err := example.NewSQLiteFlightSQLServer(db)
require.NoError(t, err) require.NoError(suite.T(), err)
sqliteServer.Alloc = memory.NewCheckedAllocator(memory.DefaultAllocator) sqliteServer.Alloc = memory.NewCheckedAllocator(memory.DefaultAllocator)
server := flight.NewServerWithMiddleware(nil) server := flight.NewServerWithMiddleware(nil)
server.RegisterFlightService(flightsql.NewFlightServer(sqliteServer)) server.RegisterFlightService(flightsql.NewFlightServer(sqliteServer))
err = server.Init("localhost:12345") err = server.Init("localhost:12345")
require.NoError(t, err) require.NoError(suite.T(), err)
go func() { go func() {
err := server.Serve() err := server.Serve()
assert.NoError(t, err) require.NoError(suite.T(), err)
}() }()
defer server.Shutdown() suite.db = db
suite.server = server
}
resp, err := Query( func (suite *FSQLTestSuite) AfterTest(suiteName, testName string) {
context.Background(), err := suite.db.Close()
&models.DatasourceInfo{ require.NoError(suite.T(), err)
HTTPClient: nil, suite.server.Shutdown()
Token: "secret", }
URL: "http://localhost:12345",
DbName: "influxdb", func TestFSQLTestSuite(t *testing.T) {
Version: "test", suite.Run(t, new(FSQLTestSuite))
HTTPMode: "proxy", }
Metadata: []map[string]string{
{ func (suite *FSQLTestSuite) TestIntegration_QueryData() {
"bucket": "bucket", suite.Run("should run simple query data", func() {
resp, err := Query(
context.Background(),
&models.DatasourceInfo{
HTTPClient: nil,
Token: "secret",
URL: "http://localhost:12345",
DbName: "influxdb",
Version: "test",
HTTPMode: "proxy",
Metadata: []map[string]string{
{
"bucket": "bucket",
},
},
SecureGrpc: false,
},
backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
RefID: "A",
JSON: mustQueryJSON(suite.T(), "A", "select * from intTable"),
},
{
RefID: "B",
JSON: mustQueryJSON(suite.T(), "B", "select 1"),
},
}, },
}, },
SecureGrpc: false, )
},
backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
RefID: "A",
JSON: mustQueryJSON(t, "A", "select * from intTable"),
},
{
RefID: "B",
JSON: mustQueryJSON(t, "B", "select 1"),
},
},
},
)
require.NoError(t, err)
require.Len(t, resp.Responses, 2)
respA := resp.Responses["A"] require.NoError(suite.T(), err)
require.NoError(t, respA.Error) require.Len(suite.T(), resp.Responses, 2)
frame := respA.Frames[0]
require.Equal(t, "id", frame.Fields[0].Name) respA := resp.Responses["A"]
require.Equal(t, "keyName", frame.Fields[1].Name) require.NoError(suite.T(), respA.Error)
require.Equal(t, "value", frame.Fields[2].Name) frame := respA.Frames[0]
require.Equal(t, "foreignId", frame.Fields[3].Name)
for _, f := range frame.Fields { require.Equal(suite.T(), "id", frame.Fields[0].Name)
assert.Equal(t, 4, f.Len()) require.Equal(suite.T(), "keyName", frame.Fields[1].Name)
} require.Equal(suite.T(), "value", frame.Fields[2].Name)
require.Equal(suite.T(), "foreignId", frame.Fields[3].Name)
for _, f := range frame.Fields {
require.Equal(suite.T(), 4, f.Len())
}
})
} }
func mustQueryJSON(t *testing.T, refID, sql string) []byte { func mustQueryJSON(t *testing.T, refID, sql string) []byte {

View File

@ -3,9 +3,7 @@ package fsql
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"net/url" "net/url"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -94,11 +92,10 @@ func runnerFromDataSource(dsInfo *models.DatasourceInfo) (*runner, error) {
return nil, fmt.Errorf("bad URL : %s", err) return nil, fmt.Errorf("bad URL : %s", err)
} }
host, port, err := net.SplitHostPort(u.Host) addr := u.Host
if err != nil { if u.Port() == "" {
return nil, fmt.Errorf("bad URL : %s", err) addr += ":443"
} }
addr := strings.Join([]string{host, port}, ":")
md := metadata.MD{} md := metadata.MD{}
for _, m := range dsInfo.Metadata { for _, m := range dsInfo.Metadata {