Optimizing weh_hub.Start() for master (#5576)

* Optimizing weh_hub.Start()

* Optimizing weh_hub.Start()

* Optimizing weh_hub.Start()

* Adding to IsAuthenticated

* Fixing problem with IsAuthenticated
This commit is contained in:
Corey Hulen
2017-03-01 11:03:21 -05:00
committed by GitHub
parent 5df5d1fd02
commit 71e6a423e8
2 changed files with 43 additions and 26 deletions

View File

@@ -29,6 +29,7 @@ type WebConn struct {
Send chan model.WebSocketMessage
SessionToken string
SessionExpiresAt int64
Session *model.Session
UserId string
T goi18n.TranslateFunc
Locale string
@@ -148,6 +149,7 @@ func (webCon *WebConn) InvalidateCache() {
webCon.AllChannelMembers = nil
webCon.LastAllChannelMembersTime = 0
webCon.SessionExpiresAt = 0
webCon.Session = nil
}
func (webCon *WebConn) IsAuthenticated() bool {
@@ -162,11 +164,13 @@ func (webCon *WebConn) IsAuthenticated() bool {
l4g.Error(utils.T("api.websocket.invalid_session.error"), err.Error())
webCon.SessionToken = ""
webCon.SessionExpiresAt = 0
webCon.Session = nil
return false
}
webCon.SessionToken = session.Token
webCon.SessionExpiresAt = session.ExpiresAt
webCon.Session = session
}
return true
@@ -231,17 +235,23 @@ func (webCon *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
}
func (webCon *WebConn) IsMemberOfTeam(teamId string) bool {
session, err := GetSession(webCon.SessionToken)
if err != nil {
l4g.Error(utils.T("api.websocket.invalid_session.error"), err.Error())
return false
} else {
member := session.GetTeamByTeamId(teamId)
if member != nil {
return true
} else {
if webCon.Session == nil {
session, err := GetSession(webCon.SessionToken)
if err != nil {
l4g.Error(utils.T("api.websocket.invalid_session.error"), err.Error())
return false
} else {
webCon.Session = session
}
}
member := webCon.Session.GetTeamByTeamId(teamId)
if member != nil {
return true
} else {
return false
}
}

View File

@@ -89,6 +89,15 @@ func HubUnregister(webConn *WebConn) {
}
func Publish(message *model.WebSocketEvent) {
if SkipTypingMessage(message) {
if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
metrics.IncrementWebsocketEvent(message.Event + "_skipped")
}
return
}
if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
metrics.IncrementWebsocketEvent(message.Event)
}
@@ -278,20 +287,18 @@ func (h *Hub) Start() {
}
case msg := <-h.broadcast:
if OkToSendTypingMessage(msg) {
for _, webCon := range h.connections {
if webCon.ShouldSendEvent(msg) {
select {
case webCon.Send <- msg:
default:
l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
close(webCon.Send)
for i, webConCandidate := range h.connections {
if webConCandidate == webCon {
h.connections[i] = h.connections[len(h.connections)-1]
h.connections = h.connections[:len(h.connections)-1]
break
}
for _, webCon := range h.connections {
if webCon.ShouldSendEvent(msg) {
select {
case webCon.Send <- msg:
default:
l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
close(webCon.Send)
for i, webConCandidate := range h.connections {
if webConCandidate == webCon {
h.connections[i] = h.connections[len(h.connections)-1]
h.connections = h.connections[:len(h.connections)-1]
break
}
}
}
@@ -331,13 +338,13 @@ func (h *Hub) Start() {
go doRecoverableStart()
}
func OkToSendTypingMessage(msg *model.WebSocketEvent) bool {
func SkipTypingMessage(msg *model.WebSocketEvent) bool {
// Only broadcast typing messages if less than 1K people in channel
if msg.Event == model.WEBSOCKET_EVENT_TYPING {
if Srv.Store.Channel().GetMemberCountFromCache(msg.Broadcast.ChannelId) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
return false
return true
}
}
return true
return false
}