mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Add user service method SetUsingOrg and GetSignedInUserWithCacheCtx (#53343)
* Chore: Add user service method SetUsingOrg * Chore: Add user service method GetSignedInUserWithCacheCtx * Use method GetSignedInUserWithCacheCtx from user service * Fix lint after rebase * Fix lint * Fix lint error * roll back some changes * Roll back changes in api and middleware * Add xorm tags to SignedInUser ID fields
This commit is contained in:
@@ -43,7 +43,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e
|
||||
JoinLeave: true,
|
||||
}
|
||||
query := &models.GetLiveMessageQuery{
|
||||
OrgId: u.OrgId,
|
||||
OrgId: u.OrgID,
|
||||
Channel: e.Channel,
|
||||
}
|
||||
msg, ok, err := b.liveMessageStore.GetLiveMessage(query)
|
||||
@@ -59,7 +59,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e
|
||||
// OnPublish is called when a client wants to broadcast on the websocket
|
||||
func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
|
||||
query := &models.SaveLiveMessageQuery{
|
||||
OrgId: u.OrgId,
|
||||
OrgId: u.OrgID,
|
||||
Channel: e.Channel,
|
||||
Data: e.Data,
|
||||
}
|
||||
|
@@ -43,7 +43,7 @@ func TestBroadcastRunner_OnSubscribe(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
reply, status, err := handler.OnSubscribe(
|
||||
context.Background(),
|
||||
&user.SignedInUser{OrgId: 1, UserId: 2},
|
||||
&user.SignedInUser{OrgID: 1, UserID: 2},
|
||||
models.SubscribeEvent{Channel: channel, Path: "test"},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
@@ -77,7 +77,7 @@ func TestBroadcastRunner_OnPublish(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
reply, status, err := handler.OnPublish(
|
||||
context.Background(),
|
||||
&user.SignedInUser{OrgId: 1, UserId: 2},
|
||||
&user.SignedInUser{OrgID: 1, UserID: 2},
|
||||
models.PublishEvent{Channel: channel, Path: "test", Data: data},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
@@ -33,7 +33,7 @@ func (h *CommentHandler) OnSubscribe(ctx context.Context, user *user.SignedInUse
|
||||
}
|
||||
objectType := parts[0]
|
||||
objectID := parts[1]
|
||||
ok, err := h.permissionChecker.CheckReadPermissions(ctx, user.OrgId, user, objectType, objectID)
|
||||
ok, err := h.permissionChecker.CheckReadPermissions(ctx, user.OrgID, user, objectType, objectID)
|
||||
if err != nil {
|
||||
return models.SubscribeReply{}, 0, err
|
||||
}
|
||||
|
@@ -66,14 +66,14 @@ func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *user.SignedInU
|
||||
|
||||
// make sure can view this dashboard
|
||||
if len(parts) == 2 && parts[0] == "uid" {
|
||||
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgId}
|
||||
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgID}
|
||||
if err := h.DashboardService.GetDashboard(ctx, &query); err != nil {
|
||||
logger.Error("Error getting dashboard", "query", query, "error", err)
|
||||
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
|
||||
}
|
||||
|
||||
dash := query.Result
|
||||
guard := guardian.New(ctx, dash.Id, user.OrgId, user)
|
||||
guard := guardian.New(ctx, dash.Id, user.OrgID, user)
|
||||
if canView, err := guard.CanView(); err != nil || !canView {
|
||||
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
|
||||
}
|
||||
@@ -113,13 +113,13 @@ func (h *DashboardHandler) OnPublish(ctx context.Context, user *user.SignedInUse
|
||||
// just ignore the event
|
||||
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("ignore???")
|
||||
}
|
||||
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgId}
|
||||
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgID}
|
||||
if err := h.DashboardService.GetDashboard(ctx, &query); err != nil {
|
||||
logger.Error("Unknown dashboard", "query", query)
|
||||
return models.PublishReply{}, backend.PublishStreamStatusNotFound, nil
|
||||
}
|
||||
|
||||
guard := guardian.New(ctx, query.Result.Id, user.OrgId, user)
|
||||
guard := guardian.New(ctx, query.Result.Id, user.OrgID, user)
|
||||
canEdit, err := guard.CanEdit()
|
||||
if err != nil {
|
||||
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
|
||||
|
@@ -84,7 +84,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInU
|
||||
return models.SubscribeReply{}, resp.Status, nil
|
||||
}
|
||||
|
||||
submitResult, err := r.runStreamManager.SubmitStream(ctx, user, orgchannel.PrependOrgID(user.OrgId, e.Channel), r.path, e.Data, pCtx, r.handler, false)
|
||||
submitResult, err := r.runStreamManager.SubmitStream(ctx, user, orgchannel.PrependOrgID(user.OrgID, e.Channel), r.path, e.Data, pCtx, r.handler, false)
|
||||
if err != nil {
|
||||
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
|
||||
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
|
||||
|
@@ -357,7 +357,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
|
||||
|
||||
// Centrifuge expects Credentials in context with a current user ID.
|
||||
cred := ¢rifuge.Credentials{
|
||||
UserID: fmt.Sprintf("%d", user.UserId),
|
||||
UserID: fmt.Sprintf("%d", user.UserID),
|
||||
}
|
||||
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
|
||||
newCtx = livecontext.SetContextSignedUser(newCtx, user)
|
||||
@@ -630,7 +630,7 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.
|
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
|
||||
}
|
||||
|
||||
if user.OrgId != orgID {
|
||||
if user.OrgID != orgID {
|
||||
logger.Info("Error subscribing: wrong orgId", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
|
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied
|
||||
}
|
||||
@@ -640,7 +640,7 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.
|
||||
var ruleFound bool
|
||||
|
||||
if g.Pipeline != nil {
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgID, channel)
|
||||
if err != nil {
|
||||
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
|
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
|
||||
@@ -730,13 +730,13 @@ func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Cl
|
||||
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
|
||||
}
|
||||
|
||||
if user.OrgId != orgID {
|
||||
if user.OrgID != orgID {
|
||||
logger.Info("Error subscribing: wrong orgId", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
|
||||
return centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied
|
||||
}
|
||||
|
||||
if g.Pipeline != nil {
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgID, channel)
|
||||
if err != nil {
|
||||
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
|
||||
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
|
||||
@@ -760,7 +760,7 @@ func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Cl
|
||||
return centrifuge.PublishReply{}, ¢rifuge.Error{Code: uint32(code), Message: text}
|
||||
}
|
||||
}
|
||||
_, err := g.Pipeline.ProcessInput(client.Context(), user.OrgId, channel, e.Data)
|
||||
_, err := g.Pipeline.ProcessInput(client.Context(), user.OrgID, channel, e.Data)
|
||||
if err != nil {
|
||||
logger.Error("Error processing input", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
|
||||
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
|
||||
@@ -919,7 +919,7 @@ func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *user.SignedInUse
|
||||
}
|
||||
|
||||
func (g *GrafanaLive) handleStreamScope(u *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
||||
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, live.ScopeStream, namespace)
|
||||
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgID, live.ScopeStream, namespace)
|
||||
}
|
||||
|
||||
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
||||
@@ -965,12 +965,12 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons
|
||||
return response.Error(http.StatusBadRequest, "invalid channel ID", nil)
|
||||
}
|
||||
|
||||
logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel)
|
||||
logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserID, "channel", cmd.Channel)
|
||||
user := ctx.SignedInUser
|
||||
channel := cmd.Channel
|
||||
|
||||
if g.Pipeline != nil {
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
|
||||
rule, ok, err := g.Pipeline.Get(user.OrgID, channel)
|
||||
if err != nil {
|
||||
logger.Error("Error getting channel rule", "user", user, "channel", channel, "error", err)
|
||||
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
|
||||
@@ -990,7 +990,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons
|
||||
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
|
||||
}
|
||||
}
|
||||
_, err := g.Pipeline.ProcessInput(ctx.Req.Context(), user.OrgId, channel, cmd.Data)
|
||||
_, err := g.Pipeline.ProcessInput(ctx.Req.Context(), user.OrgID, channel, cmd.Data)
|
||||
if err != nil {
|
||||
logger.Error("Error processing input", "user", user, "channel", channel, "error", err)
|
||||
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
|
||||
@@ -1016,13 +1016,13 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons
|
||||
return response.Error(code, text, nil)
|
||||
}
|
||||
if reply.Data != nil {
|
||||
err = g.Publish(ctx.OrgId, cmd.Channel, cmd.Data)
|
||||
err = g.Publish(ctx.OrgID, cmd.Channel, cmd.Data)
|
||||
if err != nil {
|
||||
logger.Error("Error publish to channel", "error", err, "channel", cmd.Channel)
|
||||
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
|
||||
}
|
||||
}
|
||||
logger.Debug("Publication successful", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel)
|
||||
logger.Debug("Publication successful", "user", ctx.SignedInUser.UserID, "channel", cmd.Channel)
|
||||
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
|
||||
}
|
||||
|
||||
@@ -1035,9 +1035,9 @@ func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response {
|
||||
var channels []*managedstream.ManagedChannel
|
||||
var err error
|
||||
if g.IsHA() {
|
||||
channels, err = g.surveyCaller.CallManagedStreams(c.SignedInUser.OrgId)
|
||||
channels, err = g.surveyCaller.CallManagedStreams(c.SignedInUser.OrgID)
|
||||
} else {
|
||||
channels, err = g.ManagedStreamRunner.GetManagedChannels(c.SignedInUser.OrgId)
|
||||
channels, err = g.ManagedStreamRunner.GetManagedChannels(c.SignedInUser.OrgID)
|
||||
}
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err)
|
||||
@@ -1053,7 +1053,7 @@ func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response {
|
||||
path := web.Params(ctx.Req)["*"]
|
||||
if path == "grafana/dashboards/gitops" {
|
||||
return response.JSON(http.StatusOK, util.DynMap{
|
||||
"active": g.GrafanaScope.Dashboards.HasGitOpsObserver(ctx.SignedInUser.OrgId),
|
||||
"active": g.GrafanaScope.Dashboards.HasGitOpsObserver(ctx.SignedInUser.OrgID),
|
||||
})
|
||||
}
|
||||
return response.JSONStreaming(404, util.DynMap{
|
||||
@@ -1063,7 +1063,7 @@ func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response {
|
||||
|
||||
// HandleChannelRulesListHTTP ...
|
||||
func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response.Response {
|
||||
result, err := g.pipelineStorage.ListChannelRules(c.Req.Context(), c.OrgId)
|
||||
result, err := g.pipelineStorage.ListChannelRules(c.Req.Context(), c.OrgID)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to get channel rules", err)
|
||||
}
|
||||
@@ -1148,7 +1148,7 @@ func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) respon
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Error creating pipeline", err)
|
||||
}
|
||||
rule, ok, err := channelRuleGetter.Get(c.OrgId, req.Channel)
|
||||
rule, ok, err := channelRuleGetter.Get(c.OrgID, req.Channel)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Error getting channel rule", err)
|
||||
}
|
||||
@@ -1158,7 +1158,7 @@ func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) respon
|
||||
if rule.Converter == nil {
|
||||
return response.Error(http.StatusNotFound, "No converter found", nil)
|
||||
}
|
||||
channelFrames, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data))
|
||||
channelFrames, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgID, req.Channel, []byte(req.Data))
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Error converting data", err)
|
||||
}
|
||||
@@ -1178,7 +1178,7 @@ func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response.
|
||||
if err != nil {
|
||||
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
|
||||
}
|
||||
rule, err := g.pipelineStorage.CreateChannelRule(c.Req.Context(), c.OrgId, cmd)
|
||||
rule, err := g.pipelineStorage.CreateChannelRule(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to create channel rule", err)
|
||||
}
|
||||
@@ -1201,7 +1201,7 @@ func (g *GrafanaLive) HandleChannelRulesPutHTTP(c *models.ReqContext) response.R
|
||||
if cmd.Pattern == "" {
|
||||
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
|
||||
}
|
||||
rule, err := g.pipelineStorage.UpdateChannelRule(c.Req.Context(), c.OrgId, cmd)
|
||||
rule, err := g.pipelineStorage.UpdateChannelRule(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to update channel rule", err)
|
||||
}
|
||||
@@ -1224,7 +1224,7 @@ func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) respons
|
||||
if cmd.Pattern == "" {
|
||||
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
|
||||
}
|
||||
err = g.pipelineStorage.DeleteChannelRule(c.Req.Context(), c.OrgId, cmd)
|
||||
err = g.pipelineStorage.DeleteChannelRule(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to delete channel rule", err)
|
||||
}
|
||||
@@ -1244,7 +1244,7 @@ func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) respo
|
||||
|
||||
// HandleWriteConfigsListHTTP ...
|
||||
func (g *GrafanaLive) HandleWriteConfigsListHTTP(c *models.ReqContext) response.Response {
|
||||
backends, err := g.pipelineStorage.ListWriteConfigs(c.Req.Context(), c.OrgId)
|
||||
backends, err := g.pipelineStorage.ListWriteConfigs(c.Req.Context(), c.OrgID)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to get write configs", err)
|
||||
}
|
||||
@@ -1268,7 +1268,7 @@ func (g *GrafanaLive) HandleWriteConfigsPostHTTP(c *models.ReqContext) response.
|
||||
if err != nil {
|
||||
return response.Error(http.StatusBadRequest, "Error decoding write config create command", err)
|
||||
}
|
||||
result, err := g.pipelineStorage.CreateWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||
result, err := g.pipelineStorage.CreateWriteConfig(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to create write config", err)
|
||||
}
|
||||
@@ -1291,7 +1291,7 @@ func (g *GrafanaLive) HandleWriteConfigsPutHTTP(c *models.ReqContext) response.R
|
||||
if cmd.UID == "" {
|
||||
return response.Error(http.StatusBadRequest, "UID required", nil)
|
||||
}
|
||||
existingBackend, ok, err := g.pipelineStorage.GetWriteConfig(c.Req.Context(), c.OrgId, pipeline.WriteConfigGetCmd{
|
||||
existingBackend, ok, err := g.pipelineStorage.GetWriteConfig(c.Req.Context(), c.OrgID, pipeline.WriteConfigGetCmd{
|
||||
UID: cmd.UID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -1312,7 +1312,7 @@ func (g *GrafanaLive) HandleWriteConfigsPutHTTP(c *models.ReqContext) response.R
|
||||
}
|
||||
}
|
||||
}
|
||||
result, err := g.pipelineStorage.UpdateWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||
result, err := g.pipelineStorage.UpdateWriteConfig(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to update write config", err)
|
||||
}
|
||||
@@ -1335,7 +1335,7 @@ func (g *GrafanaLive) HandleWriteConfigsDeleteHTTP(c *models.ReqContext) respons
|
||||
if cmd.UID == "" {
|
||||
return response.Error(http.StatusBadRequest, "UID required", nil)
|
||||
}
|
||||
err = g.pipelineStorage.DeleteWriteConfig(c.Req.Context(), c.OrgId, cmd)
|
||||
err = g.pipelineStorage.DeleteWriteConfig(c.Req.Context(), c.OrgID, cmd)
|
||||
if err != nil {
|
||||
return response.Error(http.StatusInternalServerError, "Failed to delete write config", err)
|
||||
}
|
||||
|
@@ -241,7 +241,7 @@ func (s *NamespaceStream) GetHandlerForPath(_ string) (models.ChannelHandler, er
|
||||
|
||||
func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
|
||||
reply := models.SubscribeReply{}
|
||||
frameJSON, ok, err := s.frameCache.GetFrame(ctx, u.OrgId, e.Channel)
|
||||
frameJSON, ok, err := s.frameCache.GetFrame(ctx, u.OrgID, e.Channel)
|
||||
if err != nil {
|
||||
return reply, 0, err
|
||||
}
|
||||
|
@@ -48,7 +48,7 @@ func (g *Gateway) Run(ctx context.Context) error {
|
||||
func (g *Gateway) Handle(ctx *models.ReqContext) {
|
||||
streamID := web.Params(ctx.Req)[":streamId"]
|
||||
|
||||
stream, err := g.GrafanaLive.ManagedStreamRunner.GetOrCreateStream(ctx.SignedInUser.OrgId, liveDto.ScopeStream, streamID)
|
||||
stream, err := g.GrafanaLive.ManagedStreamRunner.GetOrCreateStream(ctx.SignedInUser.OrgID, liveDto.ScopeStream, streamID)
|
||||
if err != nil {
|
||||
logger.Error("Error getting stream", "error", err)
|
||||
ctx.Resp.WriteHeader(http.StatusInternalServerError)
|
||||
@@ -111,7 +111,7 @@ func (g *Gateway) HandlePipelinePush(ctx *models.ReqContext) {
|
||||
"bodyLength", len(body),
|
||||
)
|
||||
|
||||
ruleFound, err := g.GrafanaLive.Pipeline.ProcessInput(ctx.Req.Context(), ctx.OrgId, channelID, body)
|
||||
ruleFound, err := g.GrafanaLive.Pipeline.ProcessInput(ctx.Req.Context(), ctx.OrgID, channelID, body)
|
||||
if err != nil {
|
||||
logger.Error("Pipeline input processing error", "error", err, "body", string(body))
|
||||
if errors.Is(err, liveDto.ErrInvalidChannelID) {
|
||||
|
@@ -71,7 +71,7 @@ func (s *PipelinePushHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)
|
||||
"bodyLength", len(body),
|
||||
)
|
||||
|
||||
ruleFound, err := s.pipeline.ProcessInput(r.Context(), user.OrgId, channelID, body)
|
||||
ruleFound, err := s.pipeline.ProcessInput(r.Context(), user.OrgID, channelID, body)
|
||||
if err != nil {
|
||||
logger.Error("Pipeline input processing error", "error", err, "body", string(body))
|
||||
return
|
||||
|
@@ -67,7 +67,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
break
|
||||
}
|
||||
|
||||
stream, err := s.managedStreamRunner.GetOrCreateStream(user.OrgId, liveDto.ScopeStream, streamID)
|
||||
stream, err := s.managedStreamRunner.GetOrCreateStream(user.OrgID, liveDto.ScopeStream, streamID)
|
||||
if err != nil {
|
||||
logger.Error("Error getting stream", "error", err)
|
||||
continue
|
||||
|
@@ -71,8 +71,8 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *user.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, int64(2), user.UserID)
|
||||
require.Equal(t, int64(1), user.OrgID)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
|
||||
return testPluginContext, true, nil
|
||||
@@ -93,12 +93,12 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
return ctx.Err()
|
||||
}).Times(1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "1/test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
// try submit the same.
|
||||
result, err = manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
result, err = manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, result.StreamExists)
|
||||
|
||||
@@ -162,12 +162,12 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
|
||||
return ctx.Err()
|
||||
}).Times(1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner1, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner1, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
// try submit the same channel but different orgID.
|
||||
result, err = manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 2}, "2/test", "test", nil, backend.PluginContext{}, mockStreamRunner2, false)
|
||||
result, err = manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 2}, "2/test", "test", nil, backend.PluginContext{}, mockStreamRunner2, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
@@ -218,7 +218,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
||||
return ctx.Err()
|
||||
}).Times(1)
|
||||
|
||||
_, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
_, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "1/test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
waitWithTimeout(t, startedCh, time.Second)
|
||||
@@ -254,8 +254,8 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *user.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, int64(2), user.UserID)
|
||||
require.Equal(t, int64(1), user.OrgID)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
|
||||
return testPluginContext, true, nil
|
||||
@@ -272,7 +272,7 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
|
||||
return errors.New("boom")
|
||||
}).Times(numErrors + 1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
@@ -306,7 +306,7 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
|
||||
return nil
|
||||
}).Times(1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "test", "test", nil, backend.PluginContext{}, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
waitWithTimeout(t, result.CloseNotify, time.Second)
|
||||
@@ -337,8 +337,8 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *user.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, int64(2), user.UserID)
|
||||
require.Equal(t, int64(1), user.OrgID)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
|
||||
return testPluginContext, true, nil
|
||||
@@ -365,7 +365,7 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
|
||||
return nil
|
||||
}).Times(2)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
@@ -403,8 +403,8 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
mockContextGetter.EXPECT().GetPluginContext(context.Background(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, user *user.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
||||
require.Equal(t, int64(2), user.UserId)
|
||||
require.Equal(t, int64(1), user.OrgId)
|
||||
require.Equal(t, int64(2), user.UserID)
|
||||
require.Equal(t, int64(1), user.OrgID)
|
||||
require.Equal(t, testPluginContext.PluginID, pluginID)
|
||||
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
|
||||
return testPluginContext, true, nil
|
||||
@@ -421,7 +421,7 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
|
||||
return ctx.Err()
|
||||
}).Times(1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
result, err := manager.SubmitStream(context.Background(), &user.SignedInUser{UserID: 2, OrgID: 1}, "test", "test", nil, testPluginContext, mockStreamRunner, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
|
Reference in New Issue
Block a user