Merge pull request #185 from mattermost/mm-1348

fixes mm-1348 removing dependency on redis
This commit is contained in:
Corey Hulen
2015-07-17 14:45:50 -08:00
14 changed files with 36 additions and 217 deletions

View File

@@ -8,7 +8,6 @@ import (
"fmt"
"github.com/gorilla/mux"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/store"
"net/http"
"strings"
)
@@ -542,7 +541,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, id, c.Session.UserId, model.ACTION_VIEWED)
message.Add("channel_id", id)
store.PublishAndForget(message)
PublishAndForget(message)
result := make(map[string]string)
result["id"] = id
@@ -657,7 +656,7 @@ func addChannelMember(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, "", userId, model.ACTION_USER_ADDED)
store.PublishAndForget(message)
PublishAndForget(message)
<-Srv.Store.Channel().UpdateLastViewedAt(id, oUser.Id)
w.Write([]byte(cm.ToJson()))

View File

@@ -27,8 +27,6 @@ var commands = []commandHandler{
func InitCommand(r *mux.Router) {
l4g.Debug("Initializing command api routes")
r.Handle("/command", ApiUserRequired(command)).Methods("POST")
hub.Start()
}
func command(c *Context, w http.ResponseWriter, r *http.Request) {

View File

@@ -11,10 +11,10 @@ import (
"github.com/mattermost/platform/store"
"github.com/mattermost/platform/utils"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"path/filepath"
)
func InitPost(r *mux.Router) {
@@ -450,7 +450,7 @@ func fireAndForgetNotifications(post *model.Post, teamId, teamUrl string) {
message.Add("mentions", model.ArrayToJson(mentionedUsers))
}
store.PublishAndForget(message)
PublishAndForget(message)
}()
}
@@ -516,7 +516,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("channel_id", rpost.ChannelId)
message.Add("message", rpost.Message)
store.PublishAndForget(message)
PublishAndForget(message)
w.Write([]byte(rpost.ToJson()))
}
@@ -666,7 +666,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("post_id", post.Id)
message.Add("channel_id", post.ChannelId)
store.PublishAndForget(message)
PublishAndForget(message)
result := make(map[string]string)
result["id"] = postId

View File

@@ -28,7 +28,6 @@ func NewServer() {
Srv = &Server{}
Srv.Server = manners.NewServer()
Srv.Store = store.NewSqlStore()
store.RedisClient()
Srv.Router = mux.NewRouter()
Srv.Router.NotFoundHandler = http.HandlerFunc(Handle404)
@@ -54,7 +53,7 @@ func StopServer() {
Srv.Server.Shutdown <- true
Srv.Store.Close()
store.RedisClose()
hub.Stop()
l4g.Info("Server stopped")
}

View File

@@ -196,7 +196,7 @@ func CreateUser(c *Context, team *model.Team, user *model.User) *model.User {
// This message goes to every channel, so the channelId is irrelevant
message := model.NewMessage(team.Id, "", ruser.Id, model.ACTION_NEW_USER)
store.PublishAndForget(message)
PublishAndForget(message)
return ruser
}

View File

@@ -70,7 +70,7 @@ func (c *WebConn) readPump() {
} else {
msg.TeamId = c.TeamId
msg.UserId = c.UserId
store.PublishAndForget(&msg)
PublishAndForget(&msg)
}
}
}

View File

@@ -5,12 +5,14 @@ package api
import (
l4g "code.google.com/p/log4go"
"github.com/mattermost/platform/model"
)
type Hub struct {
teamHubs map[string]*TeamHub
register chan *WebConn
unregister chan *WebConn
broadcast chan *model.Message
stop chan string
}
@@ -18,9 +20,16 @@ var hub = &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
teamHubs: make(map[string]*TeamHub),
broadcast: make(chan *model.Message),
stop: make(chan string),
}
func PublishAndForget(message *model.Message) {
go func() {
hub.Broadcast(message)
}()
}
func (h *Hub) Register(webConn *WebConn) {
h.register <- webConn
}
@@ -29,8 +38,14 @@ func (h *Hub) Unregister(webConn *WebConn) {
h.unregister <- webConn
}
func (h *Hub) Stop(teamId string) {
h.stop <- teamId
func (h *Hub) Broadcast(message *model.Message) {
if message != nil {
h.broadcast <- message
}
}
func (h *Hub) Stop() {
h.stop <- "all"
}
func (h *Hub) Start() {
@@ -53,18 +68,17 @@ func (h *Hub) Start() {
if nh, ok := h.teamHubs[c.TeamId]; ok {
nh.Unregister(c)
}
case s := <-h.stop:
if len(s) == 0 {
l4g.Debug("stopping all connections")
for _, v := range h.teamHubs {
v.Stop()
}
return
} else if nh, ok := h.teamHubs[s]; ok {
delete(h.teamHubs, s)
nh.Stop()
case msg := <-h.broadcast:
nh := h.teamHubs[msg.TeamId]
if nh != nil {
nh.broadcast <- msg
}
case s := <-h.stop:
l4g.Debug("stopping %v connections", s)
for _, v := range h.teamHubs {
v.Stop()
}
return
}
}
}()

View File

@@ -115,9 +115,6 @@ func TestSocket(t *testing.T) {
}()
time.Sleep(2 * time.Second)
hub.Stop(team.Id)
}
func TestZZWebSocketTearDown(t *testing.T) {

View File

@@ -6,8 +6,6 @@ package api
import (
l4g "code.google.com/p/log4go"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/store"
"strings"
)
type TeamHub struct {
@@ -43,43 +41,6 @@ func (h *TeamHub) Stop() {
}
func (h *TeamHub) Start() {
pubsub := store.RedisClient().PubSub()
go func() {
defer func() {
l4g.Debug("redis reader finished for teamId=%v", h.teamId)
hub.Stop(h.teamId)
}()
l4g.Debug("redis reader starting for teamId=%v", h.teamId)
err := pubsub.Subscribe(h.teamId)
if err != nil {
l4g.Error("Error while subscribing to redis %v %v", h.teamId, err)
return
}
for {
if payload, err := pubsub.ReceiveTimeout(REDIS_WAIT); err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
if len(h.connections) == 0 {
l4g.Debug("No active connections so sending stop %v", h.teamId)
return
}
} else {
return
}
} else {
msg := store.GetMessageFromPayload(payload)
if msg != nil {
h.broadcast <- msg
}
}
}
}()
go func() {
for {
select {
@@ -110,7 +71,6 @@ func (h *TeamHub) Start() {
webCon.WebSocket.Close()
}
pubsub.Close()
return
}
}

View File

@@ -31,10 +31,6 @@
"Trace": false,
"AtRestEncryptKey": "Ya0xMrybACJ3sZZVWQC7e31h5nSDWZFS"
},
"RedisSettings": {
"DataSource": "dockerhost:6379",
"MaxOpenConns": 1000
},
"AWSSettings": {
"S3AccessKeyId": "",
"S3SecretAccessKey": "",

View File

@@ -31,10 +31,6 @@
"Trace": false,
"AtRestEncryptKey": "Ya0xMrybACJ3sZZVWQC7e31h5nSDWZFS"
},
"RedisSettings": {
"DataSource": "localhost:6379",
"MaxOpenConns": 1000
},
"AWSSettings": {
"S3AccessKeyId": "",
"S3SecretAccessKey": "",

View File

@@ -1,75 +0,0 @@
// Copyright (c) 2015 Spinpunch, Inc. All Rights Reserved.
// See License.txt for license information.
package store
import (
l4g "code.google.com/p/log4go"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"gopkg.in/redis.v2"
"strings"
"time"
)
var client *redis.Client
func RedisClient() *redis.Client {
if client == nil {
addr := utils.Cfg.RedisSettings.DataSource
client = redis.NewTCPClient(&redis.Options{
Addr: addr,
Password: "",
DB: 0,
PoolSize: utils.Cfg.RedisSettings.MaxOpenConns,
})
l4g.Info("Pinging redis at '%v'", addr)
pong, err := client.Ping().Result()
if err != nil {
l4g.Critical("Failed to open redis connection to '%v' err:%v", addr, err)
time.Sleep(time.Second)
panic("Failed to open redis connection " + err.Error())
}
if pong != "PONG" {
l4g.Critical("Failed to ping redis connection to '%v' err:%v", addr, err)
time.Sleep(time.Second)
panic("Failed to open ping connection " + err.Error())
}
}
return client
}
func RedisClose() {
l4g.Info("Closing redis")
if client != nil {
client.Close()
client = nil
}
}
func PublishAndForget(message *model.Message) {
go func() {
c := RedisClient()
result := c.Publish(message.TeamId, message.ToJson())
if result.Err() != nil {
l4g.Error("Failed to publish message err=%v, payload=%v", result.Err(), message.ToJson())
}
}()
}
func GetMessageFromPayload(m interface{}) *model.Message {
if msg, found := m.(*redis.Message); found {
return model.MessageFromJson(strings.NewReader(msg.Payload))
} else {
return nil
}
}

View File

@@ -1,59 +0,0 @@
// Copyright (c) 2015 Spinpunch, Inc. All Rights Reserved.
// See License.txt for license information.
package store
import (
"fmt"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"testing"
)
func TestRedis(t *testing.T) {
utils.LoadConfig("config.json")
c := RedisClient()
if c == nil {
t.Fatal("should have a valid redis connection")
}
pubsub := c.PubSub()
defer pubsub.Close()
m := model.NewMessage(model.NewId(), model.NewId(), model.NewId(), model.ACTION_TYPING)
m.Add("RootId", model.NewId())
err := pubsub.Subscribe(m.TeamId)
if err != nil {
t.Fatal(err)
}
// should be the subscribe success message
// lets gobble that up
if _, err := pubsub.Receive(); err != nil {
t.Fatal(err)
}
PublishAndForget(m)
fmt.Println("here1")
if msg, err := pubsub.Receive(); err != nil {
t.Fatal(err)
} else {
rmsg := GetMessageFromPayload(msg)
if m.TeamId != rmsg.TeamId {
t.Fatal("Ids do not match")
}
if m.Props["RootId"] != rmsg.Props["RootId"] {
t.Fatal("Ids do not match")
}
}
RedisClose()
}

View File

@@ -42,11 +42,6 @@ type SqlSettings struct {
AtRestEncryptKey string
}
type RedisSettings struct {
DataSource string
MaxOpenConns int
}
type LogSettings struct {
ConsoleEnable bool
ConsoleLevel string
@@ -113,7 +108,6 @@ type Config struct {
LogSettings LogSettings
ServiceSettings ServiceSettings
SqlSettings SqlSettings
RedisSettings RedisSettings
AWSSettings AWSSettings
ImageSettings ImageSettings
EmailSettings EmailSettings