live: better comment, code style, show disconnect reason (#33430)

This commit is contained in:
Alexander Emelin 2021-04-27 18:01:12 +03:00 committed by GitHub
parent 3725654b38
commit 50795bc760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -57,7 +57,11 @@ type CoreGrafanaScope struct {
Dashboards models.DashboardActivityChannel
}
// GrafanaLive pretends to be the server
// GrafanaLive manages live real-time connections to Grafana (over WebSocket at this moment).
// The main concept here is Channel. Connections can subscribe to many channels. Each channel
// can have different permissions and properties but once a connection subscribed to a channel
// it starts receiving all messages published into this channel. Thus GrafanaLive is a PUB/SUB
// server.
type GrafanaLive struct {
PluginContextProvider *plugincontext.Provider `inject:""`
Cfg *setting.Cfg `inject:""`
@ -160,41 +164,41 @@ func (g *GrafanaLive) Init() error {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.SubscribeReply{}, err)
} else {
reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: e.Channel,
Path: addr.Path,
})
if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(status)
logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
}
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
Data: reply.Data,
},
}, nil)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: e.Channel,
Path: addr.Path,
})
if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(status)
logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
}
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
Data: reply.Data,
},
}, nil)
})
// Called when a client publishes to the websocket channel.
@ -204,54 +208,60 @@ func (g *GrafanaLive) Init() error {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.PublishReply{}, err)
} else {
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
Data: e.Data,
})
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
Data: e.Data,
})
if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
}
centrifugeReply := centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}
if reply.Data != nil {
// If data is not nil then we published it manually and tell Centrifuge
// publication result so Centrifuge won't publish itself.
result, err := g.node.Publish(e.Channel, reply.Data)
if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err, "data", string(reply.Data))
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
}
centrifugeReply := centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}
if reply.Data != nil {
result, err := g.node.Publish(e.Channel, reply.Data)
if err != nil {
logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
centrifugeReply.Result = &result
}
logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifugeReply, nil)
centrifugeReply.Result = &result
}
logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifugeReply, nil)
})
client.OnDisconnect(func(_ centrifuge.DisconnectEvent) {
logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "elapsed", time.Since(connectedAt))
client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
reason := "normal"
if e.Disconnect != nil {
reason = e.Disconnect.Reason
}
logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt))
})
})