diff --git a/pkg/server/module_server.go b/pkg/server/module_server.go index 4f8ba2f32a7..053c62c837e 100644 --- a/pkg/server/module_server.go +++ b/pkg/server/module_server.go @@ -131,7 +131,7 @@ func (s *ModuleServer) Run() error { //} m.RegisterModule(modules.StorageServer, func() (services.Service, error) { - return sql.ProvideService(s.cfg, s.features, nil, s.log) + return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log) }) m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 8ad9d9bd134..e5420e4ba6a 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -20,11 +20,14 @@ import ( ) var ( - _ Service = (*service)(nil) + _ UnifiedStorageGrpcService = (*service)(nil) ) -type Service interface { +type UnifiedStorageGrpcService interface { services.NamedService + + // Return the address where this service is running + GetAddress() string } type service struct { @@ -45,12 +48,12 @@ type service struct { log log.Logger } -func ProvideService( +func ProvideUnifiedStorageGrpcService( cfg *setting.Cfg, features featuremgmt.FeatureToggles, db infraDB.DB, log log.Logger, -) (*service, error) { +) (UnifiedStorageGrpcService, error) { tracingCfg, err := tracing.ProvideTracingConfig(cfg) if err != nil { return nil, err diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index e673b79a8ef..dcc655867d6 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -328,12 +328,12 @@ func TestClientServer(t *testing.T) { dbstore := infraDB.InitTestDB(t) cfg := setting.NewCfg() - cfg.GRPCServerAddress = "localhost:0" + cfg.GRPCServerAddress = "localhost:0" // get a free address cfg.GRPCServerNetwork = "tcp" features := featuremgmt.WithFeatures() - svc, err := sql.ProvideService(cfg, features, dbstore, nil) + svc, err := sql.ProvideUnifiedStorageGrpcService(cfg, features, dbstore, nil) require.NoError(t, err) var client resource.ResourceStoreClient diff --git a/pkg/tests/apis/playlist/playlist_test.go b/pkg/tests/apis/playlist/playlist_test.go index d9153363f04..b038bdec935 100644 --- a/pkg/tests/apis/playlist/playlist_test.go +++ b/pkg/tests/apis/playlist/playlist_test.go @@ -16,6 +16,7 @@ import ( playlistv0alpha1 "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" + "github.com/grafana/grafana/pkg/services/apiserver/options" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/playlist" "github.com/grafana/grafana/pkg/setting" @@ -156,7 +157,7 @@ func TestIntegrationPlaylist(t *testing.T) { doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ AppModeProduction: false, // required for unified storage DisableAnonymous: true, - APIServerStorageType: "unified", // use the entity api tables + APIServerStorageType: options.StorageTypeUnified, // use the entity api tables EnableFeatureToggles: []string{ featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written }, @@ -170,9 +171,9 @@ func TestIntegrationPlaylist(t *testing.T) { t.Run("with dual write (unified storage, mode 1)", func(t *testing.T) { doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ - AppModeProduction: false, // required for unified storage + AppModeProduction: false, DisableAnonymous: true, - APIServerStorageType: "unified", // use the entity api tables + APIServerStorageType: options.StorageTypeUnifiedGrpc, // start a real grpc server EnableFeatureToggles: []string{}, })) }) diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 37fce966a82..03bd4056e16 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/grafana/pkg/infra/fs" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/server" + "github.com/grafana/grafana/pkg/services/apiserver/options" "github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/org/orgimpl" "github.com/grafana/grafana/pkg/services/quota/quotaimpl" @@ -29,6 +30,7 @@ import ( "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/services/user/userimpl" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/sql" ) // StartGrafana starts a Grafana server. @@ -70,6 +72,30 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes }) }) + // UnifiedStorageOverGRPC + var storage sql.UnifiedStorageGrpcService + unistore, _ := env.Cfg.Raw.GetSection("grafana-apiserver") + if unistore != nil && + unistore.Key("storage_type").MustString("") == string(options.StorageTypeUnifiedGrpc) && + unistore.Key("address").String() == "" { // no address is configured + copy := *env.Cfg + copy.GRPCServerNetwork = "tcp" + copy.GRPCServerAddress = "localhost:0" + copy.GRPCServerTLSConfig = nil + + storage, err = sql.ProvideUnifiedStorageGrpcService(©, env.FeatureToggles, env.SQLStore, env.Cfg.Logger) + require.NoError(t, err) + ctx := context.Background() + err = storage.StartAsync(ctx) + require.NoError(t, err) + err = storage.AwaitRunning(ctx) + require.NoError(t, err) + + _, err = unistore.NewKey("address", storage.GetAddress()) + require.NoError(t, err) + t.Logf("Unified storage running on %s", storage.GetAddress()) + } + go func() { // When the server runs, it will also build and initialize the service graph if err := env.Server.Run(); err != nil { @@ -80,6 +106,9 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes if err := env.Server.Shutdown(ctx, "test cleanup"); err != nil { t.Error("Timed out waiting on server to shut down") } + if storage != nil { + storage.StopAsync() + } }) // Wait for Grafana to be ready @@ -353,7 +382,7 @@ func CreateGrafDir(t *testing.T, opts ...GrafanaOpts) (string, string) { if o.APIServerStorageType != "" { section, err := getOrCreateSection("grafana-apiserver") require.NoError(t, err) - _, err = section.NewKey("storage_type", o.APIServerStorageType) + _, err = section.NewKey("storage_type", string(o.APIServerStorageType)) require.NoError(t, err) // Hardcoded local etcd until this is needed to run in CI @@ -442,9 +471,11 @@ type GrafanaOpts struct { EnableLog bool GRPCServerAddress string QueryRetries int64 - APIServerStorageType string GrafanaComAPIURL string UnifiedStorageConfig map[string]setting.UnifiedStorageConfig + + // When "unified-grpc" is selected it will also start the grpc server + APIServerStorageType options.StorageType } func CreateUser(t *testing.T, store db.DB, cfg *setting.Cfg, cmd user.CreateUserCommand) *user.User {