Live: support a dashboard gitops channel (#33291)

This commit is contained in:
Ryan McKinley 2021-04-23 12:55:31 -07:00 committed by GitHub
parent 788bc2a793
commit 1dd9e9b184
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 269 additions and 65 deletions

View File

@ -413,13 +413,19 @@ func (hs *HTTPServer) registerRoutes() {
apiRoute.Post("/frontend-metrics", bind(metrics.PostFrontendMetricsCommand{}), routing.Wrap(hs.PostFrontendMetrics))
if hs.Live.IsEnabled() {
apiRoute.Post("/live/publish", bind(dtos.LivePublishCmd{}), routing.Wrap(hs.Live.HandleHTTPPublish))
apiRoute.Group("/live", func(liveRoute routing.RouteRegister) {
// the channel path is in the name
liveRoute.Post("/publish", bind(dtos.LivePublishCmd{}), routing.Wrap(hs.Live.HandleHTTPPublish))
// POST influx line protocol
apiRoute.Post("/live/push/:streamId", hs.LivePushGateway.Handle)
// POST influx line protocol
liveRoute.Post("/push/:streamId", hs.LivePushGateway.Handle)
// List available streams and fields
apiRoute.Get("/live/list", routing.Wrap(hs.Live.HandleListHTTP))
// List available streams and fields
liveRoute.Get("/list", routing.Wrap(hs.Live.HandleListHTTP))
// Some channels may have info
liveRoute.Get("/info/*", routing.Wrap(hs.Live.HandleInfoHTTP))
})
}
// short urls

View File

@ -239,6 +239,13 @@ func (hs *HTTPServer) deleteDashboard(c *models.ReqContext) response.Response {
return response.Error(500, "Failed to delete dashboard", err)
}
if hs.Live.IsEnabled() {
err := hs.Live.GrafanaScope.Dashboards.DashboardDeleted(c.ToUserDisplayDTO(), dash.Uid)
if err != nil {
hs.log.Error("Failed to broadcast delete info", "dashboard", dash.Uid, "error", err)
}
}
return response.JSON(200, util.DynMap{
"title": dash.Title,
"message": fmt.Sprintf("Dashboard %s deleted", dash.Title),
@ -292,6 +299,30 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa
dashSvc := dashboards.NewService(hs.SQLStore)
dashboard, err := dashSvc.SaveDashboard(dashItem, allowUiUpdate)
// Tell everyone listening that the dashboard changed
if hs.Live.IsEnabled() {
if dashboard == nil {
dashboard = dash // the original request
}
// This will broadcast all save requets only if a `gitops` observer exists.
// gitops is useful when trying to save dashboards in an environment where the user can not save
channel := hs.Live.GrafanaScope.Dashboards
liveerr := channel.DashboardSaved(c.SignedInUser.ToUserDisplayDTO(), cmd.Message, dashboard, err)
// When an error exists, but the value broadcast to a gitops listener return 202
if liveerr == nil && err != nil && channel.HasGitOpsObserver() {
return response.JSON(202, util.DynMap{
"status": "pending",
"message": "changes were broadcast to the gitops listener",
})
}
if liveerr != nil {
hs.log.Warn("unable to broadcast save event", "uid", dashboard.Uid, "error", err)
}
}
if err != nil {
return hs.dashboardSaveErrorToApiResponse(err)
}
@ -304,17 +335,6 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa
}
}
// Tell everyone listening that the dashboard changed
if hs.Live.IsEnabled() {
err := hs.Live.GrafanaScope.Dashboards.DashboardSaved(
dashboard.Uid,
c.UserId,
)
if err != nil {
hs.log.Warn("unable to broadcast save event", "uid", dashboard.Uid, "error", err)
}
}
if hs.Cfg.IsPanelLibraryEnabled() {
// connect library panels for this dashboard after the dashboard is stored and has an ID
err = hs.LibraryPanelService.ConnectLibraryPanelsForDashboard(c, dashboard)

View File

@ -11,6 +11,9 @@ import (
// ChannelPublisher writes data into a channel. Note that permissions are not checked.
type ChannelPublisher func(channel string, data []byte) error
// ChannelClientCount will return the number of clients for a channel
type ChannelClientCount func(channel string) (int, error)
// SubscribeEvent contains subscription data.
type SubscribeEvent struct {
Channel string
@ -63,6 +66,16 @@ type ChannelHandlerFactory interface {
// DashboardActivityChannel is a service to advertise dashboard activity
type DashboardActivityChannel interface {
DashboardSaved(uid string, userID int64) error
DashboardDeleted(uid string, userID int64) error
// Called when a dashboard is saved -- this includes the error so we can support a
// gitops workflow that knows if the value was saved to the local database or not
// in many cases all direct save requests will fail, but the request should be forwarded
// to any gitops observers
DashboardSaved(user *UserDisplayDTO, message string, dashboard *Dashboard, err error) error
// Called when a dashboard is deleted
DashboardDeleted(user *UserDisplayDTO, uid string) error
// Experimental! Indicate is GitOps is active. This really means
// someone is subscribed to the `grafana/dashboards/gitops` channel
HasGitOpsObserver() bool
}

View File

@ -195,6 +195,14 @@ func (u *SignedInUser) NameOrFallback() string {
return u.Email
}
func (u *SignedInUser) ToUserDisplayDTO() *UserDisplayDTO {
return &UserDisplayDTO{
Id: u.UserId,
Login: u.Login,
Name: u.Name,
}
}
type UpdateUserLastSeenAtCommand struct {
UserId int64
}
@ -241,6 +249,13 @@ type UserSearchHitDTO struct {
AuthModule AuthModuleConversion `json:"-"`
}
type UserDisplayDTO struct {
Id int64 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Login string `json:"login,omitempty"`
AvatarUrl string `json:"avatarUrl"`
}
type UserIdDTO struct {
Id int64 `json:"id"`
Message string `json:"message"`

View File

@ -3,23 +3,42 @@ package features
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/guardian"
)
type actionType string
const (
ACTION_SAVED actionType = "saved"
ACTION_DELETED actionType = "deleted"
EDITING_STARTED actionType = "editing-started"
EDITING_FINISHED actionType = "editing-finished"
GITOPS_CHANNEL = "grafana/dashboard/gitops"
)
// DashboardEvent events related to dashboards
type dashboardEvent struct {
UID string `json:"uid"`
Action string `json:"action"` // saved, editing
UserID int64 `json:"userId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
UID string `json:"uid"`
Action actionType `json:"action"` // saved, editing, deleted
User *models.UserDisplayDTO `json:"user,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Message string `json:"message,omitempty"`
Dashboard *models.Dashboard `json:"dashboard,omitempty"`
Error string `json:"error,omitempty"`
}
// DashboardHandler manages all the `grafana/dashboard/*` channels
type DashboardHandler struct {
Publisher models.ChannelPublisher
Publisher models.ChannelPublisher
ClientCount models.ChannelClientCount
}
// GetHandlerForPath called on init
@ -28,16 +47,95 @@ func (h *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
}, backend.SubscribeStreamStatusOK, nil
func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
parts := strings.Split(e.Path, "/")
if parts[0] == "gitops" {
// gitops gets all changes for everything, so lets make sure it is an admin user
if !user.HasRole(models.ROLE_ADMIN) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return models.SubscribeReply{
Presence: true,
}, backend.SubscribeStreamStatusOK, nil
}
// make sure can view this dashboard
if len(parts) == 2 && parts[0] == "uid" {
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgId}
if err := bus.Dispatch(&query); err != nil {
logger.Error("Unknown dashboard", "query", query)
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
}
dash := query.Result
guardian := guardian.New(dash.Id, user.OrgId, user)
if canView, err := guardian.CanView(); err != nil || !canView {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
}, backend.SubscribeStreamStatusOK, nil
}
// Unknown path
logger.Error("Unknown dashboard channel", "path", e.Path)
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
}
// OnPublish is called when someone begins to edit a dashboard
func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
func (h *DashboardHandler) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
parts := strings.Split(e.Path, "/")
if parts[0] == "gitops" {
// gitops gets all changes for everything, so lets make sure it is an admin user
if !user.HasRole(models.ROLE_ADMIN) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}
// Eventually this could broadcast a message back to the dashboard saying a pull request exists
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("not implemented yet")
}
// make sure can view this dashboard
if len(parts) == 2 && parts[0] == "uid" {
event := dashboardEvent{}
err := json.Unmarshal(e.Data, &event)
if err != nil || event.UID != parts[1] {
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, fmt.Errorf("bad request")
}
if event.Action != EDITING_STARTED {
// just ignore the event
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, fmt.Errorf("ignore???")
}
query := models.GetDashboardQuery{Uid: parts[1], OrgId: user.OrgId}
if err := bus.Dispatch(&query); err != nil {
logger.Error("Unknown dashboard", "query", query)
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, nil
}
guardian := guardian.New(query.Result.Id, user.OrgId, user)
canEdit, err := guardian.CanEdit()
if err != nil {
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, fmt.Errorf("internal error")
}
// Ignore edit events if the user can not edit
if !canEdit {
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, nil // NOOP
}
// Tell everyone who is editing
event.User = user.ToUserDisplayDTO()
msg, err := json.Marshal(event)
if err != nil {
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, fmt.Errorf("internal error")
}
return models.PublishReply{Data: msg}, backend.PublishStreamStatusOK, nil
}
return models.PublishReply{}, backend.SubscribeStreamStatusNotFound, nil
}
// DashboardSaved should broadcast to the appropriate stream
@ -46,27 +144,55 @@ func (h *DashboardHandler) publish(event dashboardEvent) error {
if err != nil {
return err
}
err = h.Publisher("grafana/dashboard/uid/"+event.UID, msg)
if err != nil {
return err
// Only broadcast non-error events
if event.Error == "" {
err = h.Publisher("grafana/dashboard/uid/"+event.UID, msg)
if err != nil {
return err
}
}
return h.Publisher("grafana/dashboard/changes", msg)
// Send everything to the gitops channel
return h.Publisher(GITOPS_CHANNEL, msg)
}
// DashboardSaved will broadcast to all connected dashboards
func (h *DashboardHandler) DashboardSaved(uid string, userID int64) error {
return h.publish(dashboardEvent{
UID: uid,
Action: "saved",
UserID: userID,
})
func (h *DashboardHandler) DashboardSaved(user *models.UserDisplayDTO, message string, dashboard *models.Dashboard, err error) error {
if err != nil && !h.HasGitOpsObserver() {
return nil // only broadcast if it was OK
}
msg := dashboardEvent{
UID: dashboard.Uid,
Action: ACTION_SAVED,
User: user,
Message: message,
Dashboard: dashboard,
}
if err != nil {
msg.Error = err.Error()
}
return h.publish(msg)
}
// DashboardDeleted will broadcast to all connected dashboards
func (h *DashboardHandler) DashboardDeleted(uid string, userID int64) error {
func (h *DashboardHandler) DashboardDeleted(user *models.UserDisplayDTO, uid string) error {
return h.publish(dashboardEvent{
UID: uid,
Action: "deleted",
UserID: userID,
Action: ACTION_DELETED,
User: user,
})
}
// HasGitOpsObserver will return true if anyone is listening to the `gitops` channel
func (h *DashboardHandler) HasGitOpsObserver() bool {
count, err := h.ClientCount(GITOPS_CHANNEL)
if err != nil {
logger.Error("error getting client count", "error", err)
return false
}
return count > 0
}

View File

@ -133,7 +133,8 @@ func (g *GrafanaLive) Init() error {
// Initialize the main features
dash := &features.DashboardHandler{
Publisher: g.Publish,
Publisher: g.Publish,
ClientCount: g.ClientCount,
}
g.GrafanaScope.Dashboards = dash
g.GrafanaScope.Features["dashboard"] = dash
@ -435,9 +436,18 @@ func (g *GrafanaLive) Publish(channel string, data []byte) error {
return err
}
// ClientCount returns the number of clients
func (g *GrafanaLive) ClientCount(channel string) (int, error) {
p, err := g.node.Presence(channel)
if err != nil {
return 0, err
}
return len(p.Presence), nil
}
// IsEnabled returns true if the Grafana Live feature is enabled.
func (g *GrafanaLive) IsEnabled() bool {
return g.Cfg.IsLiveEnabled()
return g != nil && g.Cfg.IsLiveEnabled()
}
func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response {
@ -503,3 +513,16 @@ func (g *GrafanaLive) HandleListHTTP(_ *models.ReqContext) response.Response {
info["channels"] = channels
return response.JSONStreaming(200, info)
}
// HandleInfoHTTP special http response for
func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response {
path := ctx.Params("*")
if path == "grafana/dashboards/gitops" {
return response.JSON(200, util.DynMap{
"active": g.GrafanaScope.Dashboards.HasGitOpsObserver(),
})
}
return response.JSONStreaming(404, util.DynMap{
"message": "Info is not supported for this channel",
})
}

View File

@ -1,4 +1,4 @@
import { getGrafanaLiveSrv, getLegacyAngularInjector, locationService } from '@grafana/runtime';
import { getGrafanaLiveSrv, locationService } from '@grafana/runtime';
import { getDashboardSrv } from '../../dashboard/services/DashboardSrv';
import { appEvents } from 'app/core/core';
import {
@ -16,6 +16,7 @@ import { DashboardEvent, DashboardEventAction } from './types';
import { CoreGrafanaLiveFeature } from '../scopes';
import { sessionId } from '../live';
import { ShowModalReactEvent } from '../../../types/events';
import { getBackendSrv } from 'app/core/services/backend_srv';
class DashboardWatcher {
channel?: LiveChannel<DashboardEvent>;
@ -35,18 +36,17 @@ class DashboardWatcher {
}
private sendEditingState() {
if (!this.channel?.publish) {
return;
if (this.channel && this.uid) {
getBackendSrv().post(`api/live/publish`, {
channel: this.channel.id,
data: {
sessionId,
uid: this.uid,
action: this.editing ? DashboardEventAction.EditingStarted : DashboardEventAction.EditingCanceled,
timestamp: Date.now(),
},
});
}
const msg: DashboardEvent = {
sessionId,
uid: this.uid!,
action: this.editing ? DashboardEventAction.EditingStarted : DashboardEventAction.EditingCanceled,
message: (window as any).grafanaBootData?.user?.name,
timestamp: Date.now(),
};
this.channel!.publish!(msg);
}
watch(uid: string) {
@ -58,12 +58,14 @@ class DashboardWatcher {
// Check for changes
if (uid !== this.uid) {
this.leave();
this.channel = live.getChannel({
scope: LiveChannelScope.Grafana,
namespace: 'dashboard',
path: `uid/${uid}`,
});
this.channel.getStream().subscribe(this.observer);
if (uid) {
this.channel = live.getChannel({
scope: LiveChannelScope.Grafana,
namespace: 'dashboard',
path: `uid/${uid}`,
});
this.channel.getStream().subscribe(this.observer);
}
this.uid = uid;
}
}
@ -116,8 +118,7 @@ class DashboardWatcher {
return;
}
const changeTracker = getLegacyAngularInjector().get<any>('unsavedChangesSrv').tracker;
const showPopup = this.editing || changeTracker.hasChanges();
const showPopup = this.editing; // || changeTracker.hasChanges();
if (action === DashboardEventAction.Saved) {
if (showPopup) {