mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Remove context.TODO() from services (#42555)
* Remove context.TODO() from services * Fix live test
This commit is contained in:
parent
6f8e651cdb
commit
ff3cf94b56
@ -86,7 +86,7 @@ func (p *Provider) Get(ctx context.Context, pluginID string, datasourceUID strin
|
||||
}
|
||||
|
||||
if datasourceUID != "" {
|
||||
ds, err := p.DataSourceCache.GetDatasourceByUID(datasourceUID, user, skipCache)
|
||||
ds, err := p.DataSourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache)
|
||||
if err != nil {
|
||||
return pc, false, errutil.Wrap("Failed to get datasource", err)
|
||||
}
|
||||
|
@ -562,7 +562,7 @@ func TestOSSAccessControlService_GetUserPermissions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test
|
||||
userPerms, err := ac.GetUserPermissions(context.TODO(), &tt.user)
|
||||
userPerms, err := ac.GetUserPermissions(context.Background(), &tt.user)
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err, "Expected an error with GetUserPermissions.")
|
||||
return
|
||||
|
@ -39,7 +39,7 @@ func TestEngineTimeouts(t *testing.T) {
|
||||
engine.evalHandler = evalHandler
|
||||
engine.resultHandler = resultHandler
|
||||
|
||||
err := engine.processJobWithRetry(context.TODO(), job)
|
||||
err := engine.processJobWithRetry(context.Background(), job)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, true, evalHandler.EvalSucceed)
|
||||
|
@ -124,7 +124,7 @@ func TestEngineProcessJob(t *testing.T) {
|
||||
evalHandler := NewFakeEvalHandler(0)
|
||||
engine.evalHandler = evalHandler
|
||||
|
||||
err := engine.processJobWithRetry(context.TODO(), job)
|
||||
err := engine.processJobWithRetry(context.Background(), job)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, expectedAttempts, evalHandler.CallNb)
|
||||
})
|
||||
@ -134,7 +134,7 @@ func TestEngineProcessJob(t *testing.T) {
|
||||
evalHandler := NewFakeEvalHandler(1)
|
||||
engine.evalHandler = evalHandler
|
||||
|
||||
err := engine.processJobWithRetry(context.TODO(), job)
|
||||
err := engine.processJobWithRetry(context.Background(), job)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, expectedAttempts, evalHandler.CallNb)
|
||||
})
|
||||
@ -144,7 +144,7 @@ func TestEngineProcessJob(t *testing.T) {
|
||||
evalHandler := NewFakeEvalHandler(expectedAttempts)
|
||||
engine.evalHandler = evalHandler
|
||||
|
||||
err := engine.processJobWithRetry(context.TODO(), job)
|
||||
err := engine.processJobWithRetry(context.Background(), job)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, expectedAttempts, evalHandler.CallNb)
|
||||
})
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func TestStateIsUpdatedWhenNeeded(t *testing.T) {
|
||||
ctx := NewEvalContext(context.TODO(), &Rule{Conditions: []Condition{&conditionStub{firing: true}}}, &validations.OSSPluginRequestValidator{})
|
||||
ctx := NewEvalContext(context.Background(), &Rule{Conditions: []Condition{&conditionStub{firing: true}}}, &validations.OSSPluginRequestValidator{})
|
||||
|
||||
t.Run("ok -> alerting", func(t *testing.T) {
|
||||
ctx.PrevAlertState = models.AlertStateOK
|
||||
|
@ -25,7 +25,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
handler := NewEvalHandler(nil)
|
||||
|
||||
t.Run("Show return triggered with single passing condition", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{&conditionStub{
|
||||
firing: true,
|
||||
}},
|
||||
@ -37,7 +37,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return triggered with single passing condition2", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{&conditionStub{firing: true, operator: "and"}},
|
||||
}, &validations.OSSPluginRequestValidator{})
|
||||
|
||||
@ -47,7 +47,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return false with not passing asdf", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and", matches: []*EvalMatch{{}, {}}},
|
||||
&conditionStub{firing: false, operator: "and"},
|
||||
@ -60,7 +60,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return true if any of the condition is passing with OR operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: false, operator: "or"},
|
||||
@ -73,7 +73,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return false if any of the condition is failing with AND operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: false, operator: "and"},
|
||||
@ -86,7 +86,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return true if one condition is failing with nested OR operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
@ -100,7 +100,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return false if one condition is passing with nested OR operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: false, operator: "and"},
|
||||
@ -114,7 +114,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return false if a condition is failing with nested AND operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: false, operator: "and"},
|
||||
@ -128,7 +128,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Show return true if a condition is passing with nested OR operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: true, operator: "and"},
|
||||
&conditionStub{firing: false, operator: "or"},
|
||||
@ -142,7 +142,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Should return false if no condition is firing using OR operator", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{firing: false, operator: "or"},
|
||||
&conditionStub{firing: false, operator: "or"},
|
||||
@ -157,7 +157,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
|
||||
// FIXME: What should the actual test case name be here?
|
||||
t.Run("Should not return NoDataFound if all conditions have data and using OR", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{operator: "or", noData: false},
|
||||
&conditionStub{operator: "or", noData: false},
|
||||
@ -170,7 +170,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Should return NoDataFound if one condition has no data", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{operator: "and", noData: true},
|
||||
},
|
||||
@ -182,7 +182,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Should return no data if at least one condition has no data and using AND", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{operator: "and", noData: true},
|
||||
&conditionStub{operator: "and", noData: false},
|
||||
@ -194,7 +194,7 @@ func TestAlertingEvaluationHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Should return no data if at least one condition has no data and using OR", func(t *testing.T) {
|
||||
context := NewEvalContext(context.TODO(), &Rule{
|
||||
context := NewEvalContext(context.Background(), &Rule{
|
||||
Conditions: []Condition{
|
||||
&conditionStub{operator: "or", noData: true},
|
||||
&conditionStub{operator: "or", noData: false},
|
||||
|
@ -72,7 +72,7 @@ func TestWhenAlertManagerShouldNotify(t *testing.T) {
|
||||
|
||||
evalContext.Rule.State = tc.newState
|
||||
|
||||
res := am.ShouldNotify(context.TODO(), evalContext, &models.AlertNotificationState{})
|
||||
res := am.ShouldNotify(context.Background(), evalContext, &models.AlertNotificationState{})
|
||||
if res != tc.expect {
|
||||
t.Errorf("got %v expected %v", res, tc.expect)
|
||||
}
|
||||
|
@ -257,7 +257,7 @@ func TestSendSlackRequest(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
slackNotifier := not.(*SlackNotifier)
|
||||
|
||||
err = slackNotifier.sendRequest(context.TODO(), []byte("test"))
|
||||
err = slackNotifier.sendRequest(context.Background(), []byte("test"))
|
||||
if !test.expectError {
|
||||
require.NoError(tt, err)
|
||||
} else {
|
||||
|
@ -47,7 +47,7 @@ func (srv *CleanUpService) Run(ctx context.Context) error {
|
||||
defer cancelFn()
|
||||
|
||||
srv.cleanUpTmpFiles()
|
||||
srv.deleteExpiredSnapshots()
|
||||
srv.deleteExpiredSnapshots(ctx)
|
||||
srv.deleteExpiredDashboardVersions(ctx)
|
||||
srv.cleanUpOldAnnotations(ctxWithTimeout)
|
||||
srv.expireOldUserInvites(ctx)
|
||||
@ -125,9 +125,9 @@ func (srv *CleanUpService) shouldCleanupTempFile(filemtime time.Time, now time.T
|
||||
return filemtime.Add(srv.Cfg.TempDataLifetime).Before(now)
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredSnapshots() {
|
||||
func (srv *CleanUpService) deleteExpiredSnapshots(ctx context.Context) {
|
||||
cmd := models.DeleteExpiredSnapshotsCommand{}
|
||||
if err := bus.DispatchCtx(context.TODO(), &cmd); err != nil {
|
||||
if err := bus.DispatchCtx(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Failed to delete expired snapshots", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
|
||||
|
@ -129,7 +129,7 @@ func (dr *dashboardServiceImpl) buildSaveDashboardCommand(ctx context.Context, d
|
||||
}
|
||||
|
||||
if isParentFolderChanged {
|
||||
folderGuardian := guardian.New(context.TODO(), dash.FolderId, dto.OrgId, dto.User)
|
||||
folderGuardian := guardian.New(ctx, dash.FolderId, dto.OrgId, dto.User)
|
||||
if canSave, err := folderGuardian.CanSave(); err != nil || !canSave {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -149,7 +149,7 @@ func (dr *dashboardServiceImpl) buildSaveDashboardCommand(ctx context.Context, d
|
||||
}
|
||||
}
|
||||
|
||||
guard := guardian.New(context.TODO(), dash.GetDashboardIdForSavePermissionCheck(), dto.OrgId, dto.User)
|
||||
guard := guardian.New(ctx, dash.GetDashboardIdForSavePermissionCheck(), dto.OrgId, dto.User)
|
||||
if canSave, err := guard.CanSave(); err != nil || !canSave {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -19,7 +19,7 @@ func ProvideCacheService(cacheService *localcache.CacheService, sqlStore *sqlsto
|
||||
|
||||
type CacheService interface {
|
||||
GetDatasource(datasourceID int64, user *models.SignedInUser, skipCache bool) (*models.DataSource, error)
|
||||
GetDatasourceByUID(datasourceUID string, user *models.SignedInUser, skipCache bool) (*models.DataSource, error)
|
||||
GetDatasourceByUID(ctx context.Context, datasourceUID string, user *models.SignedInUser, skipCache bool) (*models.DataSource, error)
|
||||
}
|
||||
|
||||
type CacheServiceImpl struct {
|
||||
@ -61,6 +61,7 @@ func (dc *CacheServiceImpl) GetDatasource(
|
||||
}
|
||||
|
||||
func (dc *CacheServiceImpl) GetDatasourceByUID(
|
||||
ctx context.Context,
|
||||
datasourceUID string,
|
||||
user *models.SignedInUser,
|
||||
skipCache bool,
|
||||
@ -84,7 +85,7 @@ func (dc *CacheServiceImpl) GetDatasourceByUID(
|
||||
|
||||
plog.Debug("Querying for data source via SQL store", "uid", datasourceUID, "orgId", user.OrgId)
|
||||
query := &models.GetDataSourceQuery{Uid: datasourceUID, OrgId: user.OrgId}
|
||||
err := dc.SQLStore.GetDataSource(context.TODO(), query)
|
||||
err := dc.SQLStore.GetDataSource(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
//go:generate mockgen -destination=plugin_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features PluginContextGetter
|
||||
|
||||
type PluginContextGetter interface {
|
||||
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
|
||||
GetPluginContext(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
|
||||
}
|
||||
|
||||
// PluginRunner can handle streaming operations for channels belonging to plugins.
|
||||
@ -61,7 +61,7 @@ type PluginPathRunner struct {
|
||||
|
||||
// OnSubscribe passes control to a plugin.
|
||||
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
|
||||
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false)
|
||||
pCtx, found, err := r.pluginContextGetter.GetPluginContext(ctx, user, r.pluginID, r.datasourceUID, false)
|
||||
if err != nil {
|
||||
logger.Error("Get plugin context error", "error", err, "path", r.path)
|
||||
return models.SubscribeReply{}, 0, err
|
||||
@ -105,7 +105,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
|
||||
|
||||
// OnPublish passes control to a plugin.
|
||||
func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
|
||||
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false)
|
||||
pCtx, found, err := r.pluginContextGetter.GetPluginContext(ctx, user, r.pluginID, r.datasourceUID, false)
|
||||
if err != nil {
|
||||
logger.Error("Get plugin context error", "error", err, "path", r.path)
|
||||
return models.PublishReply{}, 0, err
|
||||
|
@ -5,6 +5,7 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"context"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
@ -36,7 +37,7 @@ func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder
|
||||
}
|
||||
|
||||
// GetPluginContext mocks base method.
|
||||
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) {
|
||||
func (m *MockPluginContextGetter) GetPluginContext(ctx context.Context, arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(backend.PluginContext)
|
||||
@ -46,7 +47,7 @@ func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, ar
|
||||
}
|
||||
|
||||
// GetPluginContext indicates an expected call of GetPluginContext.
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(ctx context.Context, arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2)
|
||||
}
|
||||
|
@ -920,7 +920,7 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string
|
||||
}
|
||||
|
||||
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
||||
ds, err := g.DataSourceCache.GetDatasourceByUID(namespace, user, false)
|
||||
ds, err := g.DataSourceCache.GetDatasourceByUID(context.TODO(), namespace, user, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting datasource: %w", err)
|
||||
}
|
||||
|
@ -69,6 +69,6 @@ func NewContextGetter(pluginContextProvider *plugincontext.Provider) *ContextGet
|
||||
}
|
||||
}
|
||||
|
||||
func (g *ContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
return g.PluginContextProvider.Get(context.TODO(), pluginID, datasourceUID, user, skipCache)
|
||||
func (g *ContextGetter) GetPluginContext(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
return g.PluginContextProvider.Get(ctx, pluginID, datasourceUID, user, skipCache)
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ type ChannelLocalPublisher interface {
|
||||
}
|
||||
|
||||
type PluginContextGetter interface {
|
||||
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
|
||||
GetPluginContext(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
|
||||
}
|
||||
|
||||
type NumLocalSubscribersGetter interface {
|
||||
@ -182,7 +182,7 @@ func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamReq
|
||||
case <-datasourceTicker.C:
|
||||
if sr.PluginContext.DataSourceInstanceSettings != nil {
|
||||
dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
|
||||
pCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, sr.PluginContext.PluginID, dsUID, false)
|
||||
pCtx, ok, err := s.pluginContextGetter.GetPluginContext(ctx, sr.user, sr.PluginContext.PluginID, dsUID, false)
|
||||
if err != nil {
|
||||
logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err)
|
||||
continue
|
||||
@ -283,7 +283,7 @@ func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamReque
|
||||
if pluginCtx.DataSourceInstanceSettings != nil {
|
||||
datasourceUID = pluginCtx.DataSourceInstanceSettings.UID
|
||||
}
|
||||
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, pluginCtx.PluginID, datasourceUID, false)
|
||||
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(ctx, sr.user, pluginCtx.PluginID, datasourceUID, false)
|
||||
if err != nil {
|
||||
logger.Error("Error getting plugin context", "path", sr.Path, "error", err)
|
||||
isReconnect = true
|
||||
@ -407,7 +407,7 @@ func (s *Manager) SubmitStream(ctx context.Context, user *models.SignedInUser, c
|
||||
if pCtx.DataSourceInstanceSettings != nil {
|
||||
datasourceUID = pCtx.DataSourceInstanceSettings.UID
|
||||
}
|
||||
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(user, pCtx.PluginID, datasourceUID, false)
|
||||
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(ctx, user, pCtx.PluginID, datasourceUID, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
@ -83,7 +83,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
require.Equal(t, "test", req.Path)
|
||||
close(startedCh)
|
||||
@ -133,13 +133,13 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
|
||||
mockPacketSender.EXPECT().PublishLocal("1/test", gomock.Any()).Times(1)
|
||||
mockPacketSender.EXPECT().PublishLocal("2/test", gomock.Any()).Times(1)
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
return backend.PluginContext{}, true, nil
|
||||
}).Times(0)
|
||||
|
||||
mockStreamRunner1 := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner1.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
require.Equal(t, "test", req.Path)
|
||||
close(startedCh1)
|
||||
@ -152,7 +152,7 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
|
||||
|
||||
mockStreamRunner2 := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner2.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
require.Equal(t, "test", req.Path)
|
||||
close(startedCh2)
|
||||
@ -205,14 +205,14 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
||||
startedCh := make(chan struct{})
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
return backend.PluginContext{}, true, nil
|
||||
}).Times(0)
|
||||
|
||||
mockNumSubscribersGetter.EXPECT().GetNumLocalSubscribers("1/test").Return(0, nil).Times(3)
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
close(startedCh)
|
||||
<-ctx.Done()
|
||||
close(doneCh)
|
||||
@ -254,7 +254,7 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
@ -264,7 +264,7 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
if currentErrors >= numErrors {
|
||||
return nil
|
||||
@ -296,13 +296,13 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
|
||||
_ = manager.Run(ctx)
|
||||
}()
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
return backend.PluginContext{}, true, nil
|
||||
}).Times(0)
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
return nil
|
||||
}).Times(1)
|
||||
@ -337,7 +337,7 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
@ -352,7 +352,7 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
if isFirstCall {
|
||||
// first RunStream will wait till context done.
|
||||
@ -403,7 +403,7 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
@ -415,7 +415,7 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
close(doneCh)
|
||||
<-ctx.Done()
|
||||
|
@ -120,7 +120,7 @@ func (m *MockStreamRunner) RunStream(arg0 context.Context, arg1 *backend.RunStre
|
||||
}
|
||||
|
||||
// RunStream indicates an expected call of RunStream.
|
||||
func (mr *MockStreamRunnerMockRecorder) RunStream(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
func (mr *MockStreamRunnerMockRecorder) RunStream(ctx, arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunStream", reflect.TypeOf((*MockStreamRunner)(nil).RunStream), arg0, arg1, arg2)
|
||||
}
|
||||
@ -149,9 +149,9 @@ func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder
|
||||
}
|
||||
|
||||
// GetPluginContext mocks base method.
|
||||
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string, arg3 bool) (backend.PluginContext, bool, error) {
|
||||
func (m *MockPluginContextGetter) GetPluginContext(ctx context.Context, arg0 *models.SignedInUser, arg1, arg2 string, arg3 bool) (backend.PluginContext, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2, arg3)
|
||||
ret := m.ctrl.Call(m, "GetPluginContext", ctx, arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(backend.PluginContext)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -159,7 +159,7 @@ func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, ar
|
||||
}
|
||||
|
||||
// GetPluginContext indicates an expected call of GetPluginContext.
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(ctx, arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2, arg3)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), ctx, arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ func (srv RulerSrv) RoutePostNameRulesConfig(c *models.ReqContext, ruleGroupConf
|
||||
OrgID: c.SignedInUser.OrgId,
|
||||
Data: r.GrafanaManagedAlert.Data,
|
||||
}
|
||||
if err := validateCondition(cond, c.SignedInUser, c.SkipCache, srv.DatasourceCache); err != nil {
|
||||
if err := validateCondition(c.Req.Context(), cond, c.SignedInUser, c.SkipCache, srv.DatasourceCache); err != nil {
|
||||
return ErrResp(http.StatusBadRequest, err, "failed to validate alert rule %q", r.GrafanaManagedAlert.Title)
|
||||
}
|
||||
if r.GrafanaManagedAlert.UID != "" {
|
||||
|
@ -81,7 +81,7 @@ func (srv TestingApiSrv) RouteEvalQueries(c *models.ReqContext, cmd apimodels.Ev
|
||||
now = timeNow()
|
||||
}
|
||||
|
||||
if _, err := validateQueriesAndExpressions(cmd.Data, c.SignedInUser, c.SkipCache, srv.DatasourceCache); err != nil {
|
||||
if _, err := validateQueriesAndExpressions(c.Req.Context(), cmd.Data, c.SignedInUser, c.SkipCache, srv.DatasourceCache); err != nil {
|
||||
return ErrResp(http.StatusBadRequest, err, "invalid queries or expressions")
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -176,12 +177,12 @@ func messageExtractor(resp *response.NormalResponse) (interface{}, error) {
|
||||
return map[string]string{"message": string(resp.Body())}, nil
|
||||
}
|
||||
|
||||
func validateCondition(c ngmodels.Condition, user *models.SignedInUser, skipCache bool, datasourceCache datasources.CacheService) error {
|
||||
func validateCondition(ctx context.Context, c ngmodels.Condition, user *models.SignedInUser, skipCache bool, datasourceCache datasources.CacheService) error {
|
||||
if len(c.Data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
refIDs, err := validateQueriesAndExpressions(c.Data, user, skipCache, datasourceCache)
|
||||
refIDs, err := validateQueriesAndExpressions(ctx, c.Data, user, skipCache, datasourceCache)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -196,7 +197,7 @@ func validateCondition(c ngmodels.Condition, user *models.SignedInUser, skipCach
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateQueriesAndExpressions(data []ngmodels.AlertQuery, user *models.SignedInUser, skipCache bool, datasourceCache datasources.CacheService) (map[string]struct{}, error) {
|
||||
func validateQueriesAndExpressions(ctx context.Context, data []ngmodels.AlertQuery, user *models.SignedInUser, skipCache bool, datasourceCache datasources.CacheService) (map[string]struct{}, error) {
|
||||
refIDs := make(map[string]struct{})
|
||||
if len(data) == 0 {
|
||||
return nil, nil
|
||||
@ -217,7 +218,7 @@ func validateQueriesAndExpressions(data []ngmodels.AlertQuery, user *models.Sign
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = datasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache)
|
||||
_, err = datasourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid query %s: %w: %s", query.RefID, err, datasourceUID)
|
||||
}
|
||||
@ -232,7 +233,7 @@ func conditionEval(c *models.ReqContext, cmd ngmodels.EvalAlertConditionCommand,
|
||||
OrgID: c.SignedInUser.OrgId,
|
||||
Data: cmd.Data,
|
||||
}
|
||||
if err := validateCondition(evalCond, c.SignedInUser, c.SkipCache, datasourceCache); err != nil {
|
||||
if err := validateCondition(c.Req.Context(), evalCond, c.SignedInUser, c.SkipCache, datasourceCache); err != nil {
|
||||
return ErrResp(http.StatusBadRequest, err, "invalid condition")
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ func GetExprRequest(ctx AlertExecCtx, data []models.AlertQuery, now time.Time, d
|
||||
if expr.IsDataSource(q.DatasourceUID) {
|
||||
ds = expr.DataSourceModel()
|
||||
} else {
|
||||
ds, err = dsCacheService.GetDatasourceByUID(q.DatasourceUID, &m.SignedInUser{
|
||||
ds, err = dsCacheService.GetDatasourceByUID(ctx.Ctx, q.DatasourceUID, &m.SignedInUser{
|
||||
OrgId: ctx.OrgID,
|
||||
OrgRole: m.ROLE_ADMIN, // Get DS as admin for service, API calls (test/post) must check permissions based on user.
|
||||
}, true)
|
||||
|
@ -59,7 +59,7 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
|
||||
// QueryData can process queries and return query responses.
|
||||
func (s *Service) QueryData(ctx context.Context, user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) {
|
||||
parsedReq, err := s.parseMetricRequest(user, skipCache, reqDTO)
|
||||
parsedReq, err := s.parseMetricRequest(ctx, user, skipCache, reqDTO)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -152,7 +152,7 @@ type parsedRequest struct {
|
||||
parsedQueries []parsedQuery
|
||||
}
|
||||
|
||||
func (s *Service) parseMetricRequest(user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||
func (s *Service) parseMetricRequest(ctx context.Context, user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||
if len(reqDTO.Queries) == 0 {
|
||||
return nil, NewErrBadQuery("no queries found")
|
||||
}
|
||||
@ -166,7 +166,7 @@ func (s *Service) parseMetricRequest(user *models.SignedInUser, skipCache bool,
|
||||
// Parse the queries
|
||||
datasourcesByUid := map[string]*models.DataSource{}
|
||||
for _, query := range reqDTO.Queries {
|
||||
ds, err := s.getDataSourceFromQuery(user, skipCache, query, datasourcesByUid)
|
||||
ds, err := s.getDataSourceFromQuery(ctx, user, skipCache, query, datasourcesByUid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -212,7 +212,7 @@ func (s *Service) parseMetricRequest(user *models.SignedInUser, skipCache bool,
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *Service) getDataSourceFromQuery(user *models.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, error) {
|
||||
func (s *Service) getDataSourceFromQuery(ctx context.Context, user *models.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, error) {
|
||||
var err error
|
||||
uid := query.Get("datasource").Get("uid").MustString()
|
||||
|
||||
@ -246,7 +246,7 @@ func (s *Service) getDataSourceFromQuery(user *models.SignedInUser, skipCache bo
|
||||
}
|
||||
|
||||
if uid != "" {
|
||||
ds, err = s.dataSourceCache.GetDatasourceByUID(uid, user, skipCache)
|
||||
ds, err = s.dataSourceCache.GetDatasourceByUID(ctx, uid, user, skipCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user