Update split SDK to 6.0.2 to fix sync bug (#17060)

* Update split SDK to 6.0.2 to fix sync bug

* Vendor and tidy
This commit is contained in:
Joram Wilander
2021-03-04 11:16:08 -05:00
committed by GitHub
parent fa2ecad0a9
commit aba00a3cfd
150 changed files with 2500 additions and 1960 deletions

View File

@@ -9,11 +9,11 @@ import (
"github.com/splitio/go-client/v6/splitio/engine/evaluator"
"github.com/splitio/go-client/v6/splitio/engine/evaluator/impressionlabels"
impressionlistener "github.com/splitio/go-client/v6/splitio/impressionListener"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/provisional"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v2/util"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/provisional"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-split-commons/v3/util"
"github.com/splitio/go-toolkit/v4/logging"
)
// SplitClient is the entry-point of the split SDK.

View File

@@ -15,24 +15,24 @@ import (
"github.com/splitio/go-client/v6/splitio/engine"
"github.com/splitio/go-client/v6/splitio/engine/evaluator"
impressionlistener "github.com/splitio/go-client/v6/splitio/impressionListener"
config "github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/provisional"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/service/local"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v2/storage/mutexmap"
"github.com/splitio/go-split-commons/v2/storage/mutexqueue"
"github.com/splitio/go-split-commons/v2/storage/redis"
"github.com/splitio/go-split-commons/v2/synchronizer"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/event"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/impression"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/impressionscount"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/metric"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v2/tasks"
"github.com/splitio/go-toolkit/v3/logging"
config "github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/provisional"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/service/local"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-split-commons/v3/storage/mutexmap"
"github.com/splitio/go-split-commons/v3/storage/mutexqueue"
"github.com/splitio/go-split-commons/v3/storage/redis"
"github.com/splitio/go-split-commons/v3/synchronizer"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/event"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/impression"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/impressionscount"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/metric"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v3/tasks"
"github.com/splitio/go-toolkit/v4/logging"
)
const (
@@ -63,7 +63,7 @@ type SplitFactory struct {
cfg *conf.SplitSdkConfig
impressionListener *impressionlistener.WrapperImpressionListener
logger logging.LoggerInterface
syncManager *synchronizer.Manager
syncManager synchronizer.Manager
impressionManager provisional.ImpressionManager
}
@@ -372,17 +372,11 @@ func setupLocalhostFactory(
splitPeriod := cfg.TaskPeriods.SplitSync
readyChannel := make(chan int, 1)
splitAPI := &service.SplitAPI{SplitFetcher: local.NewFileSplitFetcher(cfg.SplitFile, logger)}
syncManager, err := synchronizer.NewSynchronizerManager(
synchronizer.NewLocal(
splitPeriod,
&service.SplitAPI{
SplitFetcher: local.NewFileSplitFetcher(cfg.SplitFile, logger),
},
splitStorage,
logger,
),
synchronizer.NewLocal(splitPeriod, splitAPI, splitStorage, logger),
logger,
config.AdvancedConfig{},
config.AdvancedConfig{StreamingEnabled: false},
nil,
splitStorage,
readyChannel,

View File

@@ -5,7 +5,7 @@ import (
"sync"
"github.com/splitio/go-client/v6/splitio/conf"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v4/logging"
)
// factoryInstances factory tracker instantiations

View File

@@ -9,9 +9,9 @@ import (
"strings"
"github.com/splitio/go-client/v6/splitio/engine/evaluator/impressionlabels"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-toolkit/v4/datastructures/set"
"github.com/splitio/go-toolkit/v4/logging"
)
// InputValidation struct is responsible for cheking any input of treatment and

View File

@@ -3,9 +3,9 @@ package client
import (
"fmt"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-toolkit/v4/logging"
)
// SplitManager provides information of the currently stored splits

View File

@@ -10,10 +10,10 @@ import (
"strings"
impressionlistener "github.com/splitio/go-client/v6/splitio/impressionListener"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/nethelpers"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-toolkit/v4/datastructures/set"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/nethelpers"
)
const (

View File

@@ -3,7 +3,7 @@ package conf
import (
"strings"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v3/conf"
)
// NormalizeSDKConf compares against SDK Config to set defaults

View File

@@ -7,7 +7,7 @@ import (
"github.com/splitio/go-client/v6/splitio/engine/evaluator/impressionlabels"
"github.com/splitio/go-client/v6/splitio/engine/grammar"
"github.com/splitio/go-client/v6/splitio/engine/hash"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v4/logging"
)
// Engine struct is responsible for cheking if any of the conditions of the split matches,

View File

@@ -7,11 +7,11 @@ import (
"github.com/splitio/go-client/v6/splitio/engine"
"github.com/splitio/go-client/v6/splitio/engine/evaluator/impressionlabels"
"github.com/splitio/go-client/v6/splitio/engine/grammar"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-toolkit/v3/injection"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v4/injection"
"github.com/splitio/go-toolkit/v4/logging"
)
const (

View File

@@ -2,9 +2,9 @@ package grammar
import (
"github.com/splitio/go-client/v6/splitio/engine/grammar/matchers"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/injection"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/injection"
"github.com/splitio/go-toolkit/v4/logging"
)
// Condition struct with added logic that wraps around a DTO

View File

@@ -2,7 +2,7 @@ package matchers
import (
"fmt"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
"reflect"
)

View File

@@ -1,7 +1,7 @@
package matchers
import (
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// ContainsAnyOfSetMatcher matches if the set supplied to the getTreatment is a superset of the one in the split

View File

@@ -1,7 +1,7 @@
package matchers
import (
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// EqualToSetMatcher matches if the set supplied to the getTreatment is equal to the one in the split

View File

@@ -3,7 +3,7 @@ package matchers
import (
"fmt"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v3/storage"
)
// InSegmentMatcher matches if the key passed is in the segment which the matcher was constructed with

View File

@@ -4,9 +4,9 @@ import (
"errors"
"fmt"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/injection"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/injection"
"github.com/splitio/go-toolkit/v4/logging"
)
const (

View File

@@ -1,7 +1,7 @@
package matchers
import (
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// PartOfSetMatcher matches if the set supplied to the getTreatment is a subset of the one in the split

View File

@@ -1,7 +1,7 @@
package matchers
import (
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// WhitelistMatcher matches if the key received is present in the matcher's whitelist

View File

@@ -1,9 +1,9 @@
package grammar
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/injection"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/injection"
"github.com/splitio/go-toolkit/v4/logging"
)
// Split struct with added logic that wraps around a DTO

View File

@@ -1,7 +1,7 @@
package impressionlistener
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v3/dtos"
)
// ILObject struct to map entire data for listener

View File

@@ -1,4 +1,4 @@
package splitio
// Version contains a string with the split sdk version
const Version = "6.0.1"
const Version = "6.0.2"

View File

@@ -1,47 +0,0 @@
package push
import (
"fmt"
"github.com/splitio/go-toolkit/v3/common"
)
// IncomingEvent struct to process every kind of notification that comes from streaming
type IncomingEvent struct {
id *string
timestamp *int64
encoding *string
data *string
name *string
clientID *string
event string
channel *string
message *string
code *int
statusCode *int
href *string
}
func (i *IncomingEvent) String() string {
return fmt.Sprintf(`Incoming event [id="%s", ts=%d, enc="%s", data="%s", name="%s", client="%s", `+
`event="%s", channel="%s", code=%d, status=%d]`,
common.StringFromRef(i.id),
common.Int64FromRef(i.timestamp),
common.StringFromRef(i.encoding),
common.StringFromRef(i.data),
common.StringFromRef(i.name),
common.StringFromRef(i.clientID),
i.event,
common.StringFromRef(i.channel),
common.IntFromRef(i.code),
common.IntFromRef(i.statusCode))
}
// Metrics dto
type Metrics struct {
Publishers int `json:"publishers"`
}
// Occupancy dto
type Occupancy struct {
Data Metrics `json:"metrics"`
}

View File

@@ -1,89 +0,0 @@
package push
import (
"encoding/json"
"fmt"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
)
// EventHandler struct
type EventHandler struct {
keeper *Keeper
parser *NotificationParser
processor *Processor
logger logging.LoggerInterface
}
// NewEventHandler builds new EventHandler
func NewEventHandler(keeper *Keeper, parser *NotificationParser, processor *Processor, logger logging.LoggerInterface) *EventHandler {
return &EventHandler{
keeper: keeper,
parser: parser,
processor: processor,
logger: logger,
}
}
func (e *EventHandler) wrapOccupancy(incomingEvent IncomingEvent) *Occupancy {
if incomingEvent.data == nil {
return nil
}
var occupancy *Occupancy
err := json.Unmarshal([]byte(*incomingEvent.data), &occupancy)
if err != nil {
return nil
}
return occupancy
}
func (e *EventHandler) wrapUpdateEvent(incomingEvent IncomingEvent) *dtos.IncomingNotification {
if incomingEvent.data == nil {
return nil
}
var incomingNotification *dtos.IncomingNotification
err := json.Unmarshal([]byte(*incomingEvent.data), &incomingNotification)
if err != nil {
e.logger.Error("cannot parse data as IncomingNotification type")
return nil
}
incomingNotification.Channel = *incomingEvent.channel
return incomingNotification
}
// HandleIncomingMessage handles incoming message from streaming
func (e *EventHandler) HandleIncomingMessage(event map[string]interface{}) {
incomingEvent := e.parser.Parse(event)
switch incomingEvent.event {
case update:
e.logger.Debug("Update event received")
incomingNotification := e.wrapUpdateEvent(incomingEvent)
if incomingNotification == nil {
e.logger.Debug("Skipping incoming notification...")
return
}
e.logger.Debug("Incoming Notification:", incomingNotification)
err := e.processor.Process(*incomingNotification)
if err != nil {
e.logger.Debug("Could not process notification", err.Error())
return
}
case occupancy:
e.logger.Debug("Presence event received")
occupancy := e.wrapOccupancy(incomingEvent)
if occupancy == nil || incomingEvent.channel == nil {
e.logger.Debug("Skipping occupancy...")
return
}
e.keeper.UpdateManagers(*incomingEvent.channel, occupancy.Data.Publishers)
return
case errorType: // TODO: Update this when logic is fully defined
e.logger.Error(fmt.Sprintf("Error received: %+v", incomingEvent))
default:
e.logger.Debug(fmt.Sprintf("Unexpected incomingEvent: %+v", incomingEvent))
e.logger.Error("Unexpected type of event received")
}
}

View File

@@ -1,10 +0,0 @@
package push
// Manager interface for Push Manager
type Manager interface {
Start()
Stop()
StartWorkers()
StopWorkers()
IsRunning() bool
}

View File

@@ -1,98 +0,0 @@
package push
import (
"strings"
"sync"
)
const (
// PublisherNotPresent there are no publishers sending data
PublisherNotPresent = iota
// PublisherAvailable there are publishers running
PublisherAvailable
)
const (
prefix = "[?occupancy=metrics.publishers]"
)
// last struct for storing the last notification
type last struct {
manager string
timestamp int64
mutex *sync.RWMutex
}
// Keeper struct
type Keeper struct {
managers map[string]int
activeRegion string
last last
publishers chan<- int
mutex *sync.RWMutex
}
// NewKeeper creates new keeper
func NewKeeper(publishers chan int) *Keeper {
last := last{
mutex: &sync.RWMutex{},
}
return &Keeper{
managers: make(map[string]int),
activeRegion: "us-east-1",
mutex: &sync.RWMutex{},
publishers: publishers,
last: last,
}
}
func (k *Keeper) cleanManagerPrefix(manager string) string {
return strings.Replace(manager, prefix, "", -1)
}
// Publishers returns the quantity of publishers for a particular manager
func (k *Keeper) Publishers(manager string) *int {
k.mutex.RLock()
defer k.mutex.RUnlock()
publisher, ok := k.managers[manager]
if ok {
return &publisher
}
return nil
}
// UpdateManagers updates current manager count
func (k *Keeper) UpdateManagers(manager string, publishers int) {
parsedManager := k.cleanManagerPrefix(manager)
k.mutex.Lock()
defer k.mutex.Unlock()
k.managers[parsedManager] = publishers
isAvailable := false
for _, publishers := range k.managers {
if publishers > 0 {
isAvailable = true
break
}
}
if !isAvailable {
k.publishers <- PublisherNotPresent
return
}
k.publishers <- PublisherAvailable
}
// LastNotification return the latest notification saved
func (k *Keeper) LastNotification() (string, int64) {
k.last.mutex.RLock()
defer k.last.mutex.RUnlock()
return k.last.manager, k.last.timestamp
}
// UpdateLastNotification updates last message received
func (k *Keeper) UpdateLastNotification(manager string, timestamp int64) {
k.last.mutex.Lock()
defer k.last.mutex.Unlock()
k.last.manager = k.cleanManagerPrefix(manager)
k.last.timestamp = timestamp
}

View File

@@ -1,64 +0,0 @@
package push
import (
"github.com/splitio/go-toolkit/v3/common"
"github.com/splitio/go-toolkit/v3/logging"
)
const (
update = "update"
errorType = "error"
occupancy = "[meta]occupancy"
)
// NotificationParser struct
type NotificationParser struct {
logger logging.LoggerInterface
}
// NewNotificationParser creates notifcation parser
func NewNotificationParser(logger logging.LoggerInterface) *NotificationParser {
return &NotificationParser{
logger: logger,
}
}
// Parse parses incoming event from streaming
func (n *NotificationParser) Parse(event map[string]interface{}) IncomingEvent {
incomingEvent := IncomingEvent{
id: common.AsStringOrNil(event["id"]),
encoding: common.AsStringOrNil(event["encoding"]),
data: common.AsStringOrNil(event["data"]),
name: common.AsStringOrNil(event["name"]),
clientID: common.AsStringOrNil(event["clientId"]),
channel: common.AsStringOrNil(event["channel"]),
message: common.AsStringOrNil(event["message"]),
href: common.AsStringOrNil(event["href"]),
}
timestamp := common.AsFloat64OrNil(event["timestamp"])
if timestamp != nil {
incomingEvent.timestamp = common.Int64Ref(int64(*timestamp))
}
code := common.AsFloat64OrNil(event["code"])
if code != nil {
incomingEvent.code = common.IntRef(int(*code))
}
statusCode := common.AsFloat64OrNil(event["statusCode"])
if statusCode != nil {
incomingEvent.statusCode = common.IntRef(int(*statusCode))
}
if incomingEvent.code != nil && incomingEvent.statusCode != nil {
incomingEvent.event = errorType
return incomingEvent
}
if incomingEvent.name != nil && *incomingEvent.name == occupancy {
incomingEvent.event = occupancy
return incomingEvent
}
incomingEvent.event = update
return incomingEvent
}

View File

@@ -1,112 +0,0 @@
package push
import (
"errors"
"fmt"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-toolkit/v3/logging"
)
const (
segmentQueueCheck = 5000
splitQueueCheck = 5000
streamingPausedType = "STREAMING_PAUSED"
streamingResumedType = "STREAMING_RESUMED"
streamingDisabledType = "STREAMING_DISABLED"
)
const (
// StreamingPaused The SDK should stop processing incoming UPDATE-type events
streamingPaused = iota
// StreamingResumed The SDK should resume processing UPDATE-type events (if not already)
streamingResumed
// StreamingDisabled The SDK should disable streaming completely and dont try to reconnect until the SDK is re-instantiated
streamingDisabled
)
// Processor struct for notification processor
type Processor struct {
segmentQueue chan dtos.SegmentChangeNotification
splitQueue chan dtos.SplitChangeNotification
splitStorage storage.SplitStorageProducer
controlStatus chan<- int
logger logging.LoggerInterface
}
// NewProcessor creates new processor
func NewProcessor(segmentQueue chan dtos.SegmentChangeNotification, splitQueue chan dtos.SplitChangeNotification, splitStorage storage.SplitStorageProducer, logger logging.LoggerInterface, controlStatus chan int) (*Processor, error) {
if cap(segmentQueue) < segmentQueueCheck {
return nil, errors.New("Small size of segmentQueue")
}
if cap(splitQueue) < splitQueueCheck {
return nil, errors.New("Small size of splitQueue")
}
if cap(controlStatus) < 1 {
return nil, errors.New("Small size for control chan")
}
return &Processor{
segmentQueue: segmentQueue,
splitQueue: splitQueue,
splitStorage: splitStorage,
controlStatus: controlStatus,
logger: logger,
}, nil
}
// Process takes an incoming notification and generates appropriate notifications for it.
func (p *Processor) Process(i dtos.IncomingNotification) error {
switch i.Type {
case dtos.SplitUpdate:
if i.ChangeNumber == nil {
return errors.New("ChangeNumber could not be nil, discarded")
}
splitUpdate := dtos.NewSplitChangeNotification(i.Channel, *i.ChangeNumber)
p.splitQueue <- splitUpdate
case dtos.SegmentUpdate:
if i.ChangeNumber == nil {
return errors.New("ChangeNumber could not be nil, discarded")
}
if i.SegmentName == nil {
return errors.New("SegmentName could not be nil, discarded")
}
segmentUpdate := dtos.NewSegmentChangeNotification(i.Channel, *i.ChangeNumber, *i.SegmentName)
p.segmentQueue <- segmentUpdate
case dtos.SplitKill:
if i.ChangeNumber == nil {
return errors.New("ChangeNumber could not be nil, discarded")
}
if i.SplitName == nil {
return errors.New("SplitName could not be nil, discarded")
}
if i.DefaultTreatment == nil {
return errors.New("DefaultTreatment could not be nil, discarded")
}
splitUpdate := dtos.NewSplitChangeNotification(i.Channel, *i.ChangeNumber)
p.splitStorage.KillLocally(*i.SplitName, *i.DefaultTreatment, *i.ChangeNumber)
p.splitQueue <- splitUpdate
case dtos.Control:
if i.ControlType == nil {
return errors.New("ControlType could not be nil, discarded")
}
control := dtos.NewControlNotification(i.Channel, *i.ControlType)
switch control.ControlType {
case streamingDisabledType:
p.logger.Debug("Received notification for disabling streaming")
p.controlStatus <- streamingDisabled
case streamingPausedType:
p.logger.Debug("Received notification for pausing streaming")
p.controlStatus <- streamingPaused
case streamingResumedType:
p.logger.Debug("Received notification for resuming streaming")
p.controlStatus <- streamingResumed
default:
p.logger.Debug(fmt.Sprintf("%s Unexpected type of Control Notification", control.ControlType))
}
default:
return fmt.Errorf("Unknown IncomingNotification type: %T", i)
}
return nil
}

View File

@@ -1,370 +0,0 @@
package push
import (
"errors"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/service/api/sse"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-toolkit/v3/common"
"github.com/splitio/go-toolkit/v3/logging"
sseStatus "github.com/splitio/go-toolkit/v3/sse"
)
const (
resetTimer = 120
maxPeriod = 30 * time.Minute
)
const (
// Ready represents ready
Ready = iota
// PushIsDown there are no publishers for streaming
PushIsDown
// PushIsUp there are publishers presents
PushIsUp
// BackoffAuth backoff is running for authentication
BackoffAuth
// BackoffSSE backoff is running for connecting to stream
BackoffSSE
// TokenExpiration flag to restart push services
TokenExpiration
// StreamingPaused flag for pausing streaming
StreamingPaused
// StreamingResumed flag for resuming streaming
StreamingResumed
// StreamingDisabled flag for disabling streaming
StreamingDisabled
// Reconnect flag to reconnect
Reconnect
// NonRetriableError represents an error that will force switching to polling
NonRetriableError
)
// PushManager struct for managing push services
type PushManager struct {
authClient service.AuthClient
sseClient *sse.StreamingClient
segmentWorker *SegmentUpdateWorker
splitWorker *SplitUpdateWorker
eventHandler *EventHandler
managerStatus chan<- int
streamingStatus chan int
publishers chan int
logger logging.LoggerInterface
cancelAuthBackoff chan struct{}
cancelSSEBackoff chan struct{}
cancelTokenExpiration chan struct{}
cancelStreamingWatcher chan struct{}
control chan int
status atomic.Value
}
// NewPushManager creates new PushManager
func NewPushManager(
logger logging.LoggerInterface,
synchronizeSegmentHandler func(segmentName string, till *int64) error,
synchronizeSplitsHandler func(till *int64) error,
splitStorage storage.SplitStorage,
config *conf.AdvancedConfig,
managerStatus chan int,
authClient service.AuthClient,
) (Manager, error) {
splitQueue := make(chan dtos.SplitChangeNotification, config.SplitUpdateQueueSize)
segmentQueue := make(chan dtos.SegmentChangeNotification, config.SegmentUpdateQueueSize)
control := make(chan int, 1)
processor, err := NewProcessor(segmentQueue, splitQueue, splitStorage, logger, control)
if err != nil {
return nil, err
}
parser := NewNotificationParser(logger)
if parser == nil {
return nil, errors.New("Could not instantiate NotificationParser")
}
publishers := make(chan int, 1000)
keeper := NewKeeper(publishers)
if keeper == nil {
return nil, errors.New("Could not instantiate Keeper")
}
eventHandler := NewEventHandler(keeper, parser, processor, logger)
segmentWorker, err := NewSegmentUpdateWorker(segmentQueue, synchronizeSegmentHandler, logger)
if err != nil {
return nil, err
}
splitWorker, err := NewSplitUpdateWorker(splitQueue, synchronizeSplitsHandler, logger)
if err != nil {
return nil, err
}
streamingStatus := make(chan int, 1000)
status := atomic.Value{}
status.Store(Ready)
return &PushManager{
authClient: authClient,
sseClient: sse.NewStreamingClient(config, streamingStatus, logger),
segmentWorker: segmentWorker,
splitWorker: splitWorker,
managerStatus: managerStatus,
streamingStatus: streamingStatus,
eventHandler: eventHandler,
publishers: publishers,
logger: logger,
cancelAuthBackoff: make(chan struct{}, 1),
cancelSSEBackoff: make(chan struct{}, 1),
cancelTokenExpiration: make(chan struct{}, 1),
cancelStreamingWatcher: make(chan struct{}, 1),
control: control,
status: status,
}, nil
}
func (p *PushManager) cancelStreaming() {
p.logger.Error("Error, switching to polling")
p.managerStatus <- NonRetriableError
}
func (p *PushManager) performAuthentication(errResult chan error) *dtos.Token {
select {
case <-p.cancelAuthBackoff:
// Discarding previous msg
default:
}
tokenResult := make(chan *dtos.Token, 1)
cancelAuthBackoff := common.WithBackoffCancelling(1*time.Second, maxPeriod, func() bool {
token, err := p.authClient.Authenticate()
if err != nil {
errType, ok := err.(dtos.HTTPError)
if ok && errType.Code >= http.StatusInternalServerError {
p.managerStatus <- BackoffAuth
return false // It will continue retrying
}
errResult <- errors.New("Error authenticating")
return true
}
tokenResult <- token
return true // Result is OK, Stopping Here, no more backoff
})
defer cancelAuthBackoff()
select {
case token := <-tokenResult:
if !token.PushEnabled {
return nil
}
return token
case err := <-errResult:
p.logger.Error(err.Error())
return nil
case <-p.cancelAuthBackoff:
return nil
}
}
func (p *PushManager) connectToStreaming(errResult chan error, token string, channels []string) error {
select {
case <-p.cancelSSEBackoff:
// Discarding previous msg
default:
}
sseResult := make(chan struct{}, 1)
cancelSSEBackoff := common.WithBackoffCancelling(1*time.Second, maxPeriod, func() bool {
p.sseClient.ConnectStreaming(token, channels, p.eventHandler.HandleIncomingMessage)
status := <-p.streamingStatus
switch status {
case sseStatus.OK:
sseResult <- struct{}{}
return true
case sseStatus.ErrorInternal:
p.managerStatus <- BackoffSSE
return false // It will continue retrying
default:
errResult <- errors.New("Error connecting streaming")
return true
}
})
defer cancelSSEBackoff()
select {
case <-sseResult:
return nil
case err := <-errResult:
p.logger.Error(err.Error())
return err
case <-p.cancelSSEBackoff:
return nil
}
}
func (p *PushManager) fetchStreamingToken(errResult chan error) (string, []string, error) {
token := p.performAuthentication(errResult)
if token == nil {
return "", []string{}, errors.New("Could not perform authentication")
}
channels, err := token.ChannelList()
if err != nil {
return "", []string{}, errors.New("Could not perform authentication")
}
nextTokenExpiration, err := token.CalculateNextTokenExpiration()
if err != nil {
return "", []string{}, errors.New("Could not perform authentication")
}
go func() {
// Create timeout timer for calculating next token expiration
idleDuration := nextTokenExpiration
tokenExpirationTimer := time.NewTimer(idleDuration)
defer tokenExpirationTimer.Stop()
select {
case <-tokenExpirationTimer.C: // Timedout
p.logger.Info("Token expired")
p.managerStatus <- TokenExpiration
return
case <-p.cancelTokenExpiration:
return
}
}()
return token.Token, channels, nil
}
func (p *PushManager) streamingStatusWatcher() {
for {
select {
case status := <-p.streamingStatus: // Streaming SSE Status
switch status {
case sseStatus.ErrorKeepAlive: // On ConnectionTimedOut -> Reconnect
fallthrough
case sseStatus.ErrorInternal: // On Error >= 500 -> Reconnect
fallthrough
case sseStatus.ErrorReadingStream: // On IOF -> Reconnect
p.managerStatus <- Reconnect
default: // Whatever other errors -> Send Error to disconnect
p.cancelStreaming()
}
case publisherStatus := <-p.publishers: // Publisher Available/Not Available
switch publisherStatus {
case PublisherNotPresent:
if p.status.Load().(int) != StreamingPaused {
p.managerStatus <- PushIsDown
}
case PublisherAvailable:
if p.status.Load().(int) != StreamingPaused {
p.managerStatus <- PushIsUp
}
default:
p.logger.Debug(fmt.Sprintf("Unexpected publisher status received %d", publisherStatus))
}
case controlStatus := <-p.control:
switch controlStatus {
case streamingPaused:
p.logger.Debug("Received Pause Streaming Notification")
if p.status.Load().(int) != StreamingPaused {
p.logger.Info("Sending Pause Streaming")
p.status.Store(StreamingPaused)
p.managerStatus <- PushIsDown
}
case streamingResumed:
p.logger.Debug("Received Resume Streaming Notification")
if p.status.Load().(int) == StreamingPaused {
p.status.Store(StreamingResumed)
publishersAvailable := p.eventHandler.keeper.Publishers("control_pri")
if publishersAvailable != nil && *publishersAvailable > 0 {
p.logger.Info("Sending Resume Streaming")
p.managerStatus <- PushIsUp
}
}
case streamingDisabled:
p.logger.Info("Received Streaming Disabled Notification")
p.managerStatus <- StreamingDisabled
default:
p.logger.Debug(fmt.Sprintf("Unexpected control status received %d", controlStatus))
}
case <-p.cancelStreamingWatcher: // Stopping Watcher
return
}
}
}
func (p *PushManager) drainStatus() {
select {
case <-p.cancelStreamingWatcher: // Discarding previous msg
default:
}
select {
case <-p.cancelTokenExpiration: // Discarding previous token expiration
default:
}
}
// Start push services
func (p *PushManager) Start() {
if p.IsRunning() {
p.logger.Info("PushManager is already running, skipping Start")
return
}
p.drainStatus()
// errResult listener for fetching token and connecting to SSE
errResult := make(chan error, 1)
token, channels, err := p.fetchStreamingToken(errResult)
if err != nil {
p.cancelStreaming()
return
}
err = p.connectToStreaming(errResult, token, channels)
if err != nil {
p.cancelStreaming()
return
}
// Everything is good, starting workers
p.splitWorker.Start()
p.segmentWorker.Start()
// Sending Ready
p.managerStatus <- Ready
// Starting streaming status watcher, it will listen 1) errors in SSE, 2) publishers changes, 3) stop
go p.streamingStatusWatcher()
}
// Stop push services
func (p *PushManager) Stop() {
p.logger.Info("Stopping Push Services")
p.cancelAuthBackoff <- struct{}{}
p.cancelSSEBackoff <- struct{}{}
p.cancelTokenExpiration <- struct{}{}
p.cancelStreamingWatcher <- struct{}{}
if p.sseClient.IsRunning() {
p.sseClient.StopStreaming(true)
}
p.StopWorkers()
}
// IsRunning returns true if the services are running
func (p *PushManager) IsRunning() bool {
return p.sseClient.IsRunning() || p.splitWorker.IsRunning() || p.segmentWorker.IsRunning()
}
// StopWorkers stops workers
func (p *PushManager) StopWorkers() {
if p.splitWorker.IsRunning() {
p.splitWorker.Stop()
}
if p.segmentWorker.IsRunning() {
p.segmentWorker.Stop()
}
}
// StartWorkers starts workers
func (p *PushManager) StartWorkers() {
if !p.splitWorker.IsRunning() {
p.splitWorker.Start()
}
if !p.segmentWorker.IsRunning() {
p.segmentWorker.Start()
}
}

View File

@@ -1,76 +0,0 @@
package push
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
)
// SegmentUpdateWorker struct
type SegmentUpdateWorker struct {
activeGoroutines *sync.WaitGroup
segmentQueue chan dtos.SegmentChangeNotification
handler func(segmentName string, till *int64) error
logger logging.LoggerInterface
stop chan struct{}
running atomic.Value
}
// NewSegmentUpdateWorker creates SegmentUpdateWorker
func NewSegmentUpdateWorker(segmentQueue chan dtos.SegmentChangeNotification, handler func(segmentName string, till *int64) error, logger logging.LoggerInterface) (*SegmentUpdateWorker, error) {
if cap(segmentQueue) < 5000 {
return nil, errors.New("")
}
running := atomic.Value{}
running.Store(false)
return &SegmentUpdateWorker{
segmentQueue: segmentQueue,
handler: handler,
logger: logger,
stop: make(chan struct{}, 1),
running: running,
}, nil
}
// Start starts worker
func (s *SegmentUpdateWorker) Start() {
s.logger.Debug("Started SegmentUpdateWorker")
if s.IsRunning() {
s.logger.Debug("Segment worker is already running")
return
}
s.running.Store(true)
go func() {
for {
select {
case segmentUpdate := <-s.segmentQueue:
s.logger.Debug("Received Segment update and proceding to perform fetch")
s.logger.Debug(fmt.Sprintf("SegmentName: %s\nChangeNumber: %d", segmentUpdate.SegmentName, &segmentUpdate.ChangeNumber))
err := s.handler(segmentUpdate.SegmentName, &segmentUpdate.ChangeNumber)
if err != nil {
s.logger.Error(err)
}
case <-s.stop:
return
}
}
}()
}
// Stop stops worker
func (s *SegmentUpdateWorker) Stop() {
if s.IsRunning() {
s.stop <- struct{}{}
s.running.Store(false)
}
}
// IsRunning indicates if worker is running or not
func (s *SegmentUpdateWorker) IsRunning() bool {
return s.running.Load().(bool)
}

View File

@@ -1,77 +0,0 @@
package push
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
)
// SplitUpdateWorker struct
type SplitUpdateWorker struct {
activeGoroutines *sync.WaitGroup
splitQueue chan dtos.SplitChangeNotification
handler func(till *int64) error
logger logging.LoggerInterface
stop chan struct{}
running atomic.Value
}
// NewSplitUpdateWorker creates SplitUpdateWorker
func NewSplitUpdateWorker(splitQueue chan dtos.SplitChangeNotification, handler func(till *int64) error, logger logging.LoggerInterface) (*SplitUpdateWorker, error) {
if cap(splitQueue) < 5000 {
return nil, errors.New("")
}
running := atomic.Value{}
running.Store(false)
return &SplitUpdateWorker{
activeGoroutines: &sync.WaitGroup{},
splitQueue: splitQueue,
handler: handler,
logger: logger,
running: running,
stop: make(chan struct{}, 1),
}, nil
}
// Start starts worker
func (s *SplitUpdateWorker) Start() {
s.logger.Debug("Started SplitUpdateWorker")
if s.IsRunning() {
s.logger.Info("Split worker is already running")
return
}
s.running.Store(true)
go func() {
for {
select {
case splitUpdate := <-s.splitQueue:
s.logger.Debug("Received Split update and proceding to perform fetch")
s.logger.Debug(fmt.Sprintf("ChangeNumber: %d", splitUpdate.ChangeNumber))
err := s.handler(&splitUpdate.ChangeNumber)
if err != nil {
s.logger.Error(err)
}
case <-s.stop:
return
}
}
}()
}
// Stop stops worker
func (s *SplitUpdateWorker) Stop() {
if s.IsRunning() {
s.stop <- struct{}{}
s.running.Store(false)
}
}
// IsRunning indicates if worker is running or not
func (s *SplitUpdateWorker) IsRunning() bool {
return s.running.Load().(bool)
}

View File

@@ -1,127 +0,0 @@
package sse
import (
"strings"
"sync"
"sync/atomic"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/sse"
)
const (
version = "1.1"
keepAlive = 120
)
// StreamingClient struct
type StreamingClient struct {
mutex *sync.RWMutex
sseClient *sse.SSEClient
sseStatus chan int
streamingStatus chan<- int
running atomic.Value
logger logging.LoggerInterface
stopped chan struct{}
}
// NewStreamingClient creates new SSE Client
func NewStreamingClient(cfg *conf.AdvancedConfig, streamingStatus chan int, logger logging.LoggerInterface) *StreamingClient {
sseStatus := make(chan int, 1)
sseClient, _ := sse.NewSSEClient(cfg.StreamingServiceURL, sseStatus, keepAlive, logger)
running := atomic.Value{}
running.Store(false)
return &StreamingClient{
mutex: &sync.RWMutex{},
sseClient: sseClient,
sseStatus: sseStatus,
streamingStatus: streamingStatus,
logger: logger,
running: running,
stopped: make(chan struct{}, 1),
}
}
// ConnectStreaming connects to streaming
func (s *StreamingClient) ConnectStreaming(token string, channelList []string, handleIncomingMessage func(e map[string]interface{})) {
params := make(map[string]string)
params["channels"] = strings.Join(append(channelList), ",")
params["accessToken"] = token
params["v"] = version
httpHandlerExited := make(chan struct{}, 1)
go func() {
s.sseClient.Do(params, handleIncomingMessage)
httpHandlerExited <- struct{}{}
}()
// Consume remaining message in completion signaling channel if any:
select {
case <-s.stopped:
default:
}
select {
case <-s.sseStatus:
default:
}
go func() {
defer func() { // When this goroutine exits, StopStreaming is freed
select {
case s.stopped <- struct{}{}:
default:
}
}()
for {
select {
case <-httpHandlerExited:
return
case status := <-s.sseStatus:
switch status {
case sse.OK:
s.logger.Info("SSE OK")
s.running.Store(true)
s.streamingStatus <- sse.OK
case sse.ErrorConnectToStreaming:
s.logger.Error("Error connecting to streaming")
s.streamingStatus <- sse.ErrorConnectToStreaming
case sse.ErrorKeepAlive:
s.logger.Error("Connection timed out")
s.streamingStatus <- sse.ErrorKeepAlive
case sse.ErrorOnClientCreation:
s.logger.Error("Could not create client for streaming")
s.streamingStatus <- sse.ErrorOnClientCreation
case sse.ErrorReadingStream:
s.logger.Error("Error reading streaming buffer")
s.streamingStatus <- sse.ErrorReadingStream
case sse.ErrorRequestPerformed:
s.logger.Error("Error performing request when connect to stream service")
s.streamingStatus <- sse.ErrorRequestPerformed
case sse.ErrorInternal:
s.logger.Error("Internal Error when connect to stream service")
s.streamingStatus <- sse.ErrorInternal
default:
s.logger.Error("Unexpected error occured with streaming")
s.streamingStatus <- sse.ErrorUnexpected
}
}
}
}()
}
// StopStreaming stops streaming
func (s *StreamingClient) StopStreaming(blocking bool) {
s.sseClient.Shutdown()
s.logger.Info("Stopped streaming")
s.running.Store(false)
if blocking {
<-s.stopped
}
}
// IsRunning returns true if it's running
func (s *StreamingClient) IsRunning() bool {
return s.running.Load().(bool)
}

View File

@@ -1,12 +0,0 @@
package synchronizer
// Synchronizer interface for syncing data to and from splits servers
type Synchronizer interface {
SyncAll() error
SynchronizeSplits(till *int64) error
SynchronizeSegment(segmentName string, till *int64) error
StartPeriodicFetching()
StopPeriodicFetching()
StartPeriodicDataRecording()
StopPeriodicDataRecording()
}

View File

@@ -1,184 +0,0 @@
package synchronizer
import (
"errors"
"sync/atomic"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/push"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-toolkit/v3/logging"
)
const (
// Ready represents ready
Ready = iota
// StreamingReady ready
StreamingReady
// Error represents some error in SSE streaming
Error
)
const (
// Idle flags
Idle = iota
// Streaming flags
Streaming
// Polling flags
Polling
)
// Manager struct
type Manager struct {
synchronizer Synchronizer
logger logging.LoggerInterface
config conf.AdvancedConfig
pushManager push.Manager
managerStatus chan int
streamingStatus chan int
status atomic.Value
}
// NewSynchronizerManager creates new sync manager
func NewSynchronizerManager(
synchronizer Synchronizer,
logger logging.LoggerInterface,
config conf.AdvancedConfig,
authClient service.AuthClient,
splitStorage storage.SplitStorage,
managerStatus chan int,
) (*Manager, error) {
if managerStatus == nil || cap(managerStatus) < 1 {
return nil, errors.New("Status channel cannot be nil nor having capacity")
}
status := atomic.Value{}
status.Store(Idle)
manager := &Manager{
synchronizer: synchronizer,
logger: logger,
config: config,
managerStatus: managerStatus,
status: status,
}
if config.StreamingEnabled {
streamingStatus := make(chan int, 1000)
pushManager, err := push.NewPushManager(logger, synchronizer.SynchronizeSegment, synchronizer.SynchronizeSplits, splitStorage, &config, streamingStatus, authClient)
if err != nil {
return nil, err
}
manager.pushManager = pushManager
manager.streamingStatus = streamingStatus
}
return manager, nil
}
func (s *Manager) startPolling() {
s.status.Store(Polling)
s.pushManager.StopWorkers()
s.synchronizer.StartPeriodicFetching()
}
// IsRunning returns true if is in Streaming or Polling
func (s *Manager) IsRunning() bool {
return s.status.Load().(int) != Idle
}
// Start starts synchronization through Split
func (s *Manager) Start() {
if s.IsRunning() {
s.logger.Info("Manager is already running, skipping start")
return
}
select {
case <-s.managerStatus:
// Discarding previous status before starting
default:
}
err := s.synchronizer.SyncAll()
if err != nil {
s.managerStatus <- Error
return
}
s.logger.Debug("SyncAll Ready")
s.managerStatus <- Ready
s.synchronizer.StartPeriodicDataRecording()
if s.config.StreamingEnabled {
s.logger.Info("Start Streaming")
go s.pushManager.Start()
// Listens Streaming Status
for {
status := <-s.streamingStatus
switch status {
// Backoff is running -> start polling until auth is ok
case push.BackoffAuth:
fallthrough
// Backoff is running -> start polling until sse is connected
case push.BackoffSSE:
if s.status.Load().(int) != Polling {
s.logger.Info("Start periodic polling due backoff")
s.startPolling()
}
// SSE Streaming and workers are ready
case push.Ready:
// If Ready comes eventually when Backoff is done and polling is running
if s.status.Load().(int) == Polling {
s.synchronizer.StopPeriodicFetching()
}
s.logger.Info("SSE Streaming is ready")
s.status.Store(Streaming)
go s.synchronizer.SyncAll()
case push.StreamingDisabled:
fallthrough
// NonRetriableError occurs and it will switch to polling
case push.NonRetriableError:
s.pushManager.Stop()
s.logger.Info("Start periodic polling in Streaming")
s.startPolling()
return
// Publisher sends that there is no Notification Managers available
case push.PushIsDown:
// If streaming is already running, proceeding to stop workers
// and keeping SSE running
if s.status.Load().(int) == Streaming {
s.logger.Info("Start periodic polling in Streaming")
s.startPolling()
}
// Publisher sends that there are at least one Notification Manager available
case push.PushIsUp:
// If streaming is not already running, proceeding to start workers
if s.status.Load().(int) != Streaming {
s.logger.Info("Stop periodic polling")
s.pushManager.StartWorkers()
s.synchronizer.StopPeriodicFetching()
s.status.Store(Streaming)
go s.synchronizer.SyncAll()
}
// Reconnect received due error in streaming -> reconnecting
case push.Reconnect:
fallthrough
// Token expired -> reconnecting
case push.TokenExpiration:
s.pushManager.Stop()
go s.pushManager.Start()
}
}
} else {
s.logger.Info("Start periodic polling")
s.synchronizer.StartPeriodicFetching()
s.status.Store(Polling)
}
}
// Stop stop synchronizaation through Split
func (s *Manager) Stop() {
s.logger.Info("STOPPING MANAGER TASKS")
if s.pushManager != nil && s.pushManager.IsRunning() {
s.pushManager.Stop()
}
s.synchronizer.StopPeriodicFetching()
s.synchronizer.StopPeriodicDataRecording()
s.status.Store(Idle)
}

View File

@@ -1,8 +0,0 @@
package segment
// SegmentFetcher interface
type SegmentFetcher interface {
SynchronizeSegment(name string, till *int64) error
SynchronizeSegments() error
SegmentNames() []interface{}
}

View File

@@ -1,6 +0,0 @@
package split
// SplitFetcher interface
type SplitFetcher interface {
SynchronizeSplits(till *int64) error
}

View File

@@ -1,5 +1,6 @@
package dtos
/*
const (
// SplitUpdate used when split is updated
SplitUpdate = "SPLIT_UPDATE"
@@ -147,3 +148,4 @@ func NewSplitKillNotification(channelName string, changeNumber int64, defaultTre
SplitName: splitName,
}
}
*/

View File

@@ -3,7 +3,7 @@ package provisional
import (
"sync"
"github.com/splitio/go-split-commons/v2/util"
"github.com/splitio/go-split-commons/v3/util"
)
// Key struct for mapping each key to an amount

View File

@@ -4,8 +4,8 @@ import (
"fmt"
"strings"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/provisional/hashing"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/provisional/hashing"
)
const hashKeyTemplate = "%s:%s:%s:%s:%d"

View File

@@ -3,9 +3,9 @@ package provisional
import (
"time"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/util"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/util"
)
const lastSeenCacheSize = 500000 // cache up to 500k impression hashes

View File

@@ -4,8 +4,8 @@ import (
"fmt"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/provisional/int64cache"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/provisional/int64cache"
)
// ImpressionObserver is used to check wether an impression has been previously seen

View File

@@ -0,0 +1,13 @@
package push
// Borrowed synchronizer interface to break circular dependencies
type synchronizerInterface interface {
SyncAll(requestNoCache bool) error
SynchronizeSplits(till *int64, requestNoCache bool) error
LocalKill(splitName string, defaultTreatment string, changeNumber int64)
SynchronizeSegment(segmentName string, till *int64, requestNoCache bool) error
StartPeriodicFetching()
StopPeriodicFetching()
StartPeriodicDataRecording()
StopPeriodicDataRecording()
}

View File

@@ -0,0 +1,14 @@
package push
const (
workerStatusIdle = iota
workerStatusRunning
workerStatusShuttingDown
)
const (
pushManagerStatusIdle = iota
pushManagerStatusInitializing
pushManagerStatusRunning
pushManagerStatusShuttingDown
)

View File

@@ -0,0 +1,237 @@
package push
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/service/api/sse"
"github.com/splitio/go-toolkit/v4/common"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/struct/traits/lifecycle"
)
// Status update contants that will be propagated to the push manager's user
const (
StatusUp = iota
StatusDown
StatusRetryableError
StatusNonRetryableError
)
// ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance
var ErrAlreadyRunning = errors.New("push manager already running")
// ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance
var ErrNotRunning = errors.New("push manager not running")
// Manager interface contains public methods for push manager
type Manager interface {
Start() error
Stop() error
StopWorkers()
StartWorkers()
}
// ManagerImpl implements the manager interface
type ManagerImpl struct {
parser NotificationParser
sseClient sse.StreamingClient
authAPI service.AuthClient
processor Processor
statusTracker StatusTracker
feedback FeedbackLoop
nextRefresh *time.Timer
refreshTokenMutex sync.Mutex
/*
running *gtSync.AtomicBool
status int32
shutdownWaiter chan struct{}
*/
lifecycle lifecycle.Manager
logger logging.LoggerInterface
}
// FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated
type FeedbackLoop = chan<- int64
// NewManager constructs a new push manager
func NewManager(
logger logging.LoggerInterface,
synchronizer synchronizerInterface,
cfg *conf.AdvancedConfig,
feedbackLoop chan<- int64,
authAPI service.AuthClient,
) (*ManagerImpl, error) {
processor, err := NewProcessor(cfg.SplitUpdateQueueSize, cfg.SegmentUpdateQueueSize, synchronizer, logger)
if err != nil {
return nil, fmt.Errorf("error instantiating processor: %w", err)
}
statusTracker := NewStatusTracker(logger)
parser := &NotificationParserImpl{
logger: logger,
onSplitUpdate: processor.ProcessSplitChangeUpdate,
onSplitKill: processor.ProcessSplitKillUpdate,
onSegmentUpdate: processor.ProcessSegmentChangeUpdate,
onControlUpdate: statusTracker.HandleControl,
onOccupancyMesage: statusTracker.HandleOccupancy,
onAblyError: statusTracker.HandleAblyError,
}
manager := &ManagerImpl{
authAPI: authAPI,
sseClient: sse.NewStreamingClient(cfg, logger),
statusTracker: statusTracker,
feedback: feedbackLoop,
processor: processor,
parser: parser,
logger: logger,
}
manager.lifecycle.Setup()
return manager, nil
}
// Start initiates the authentication flow and if successful initiates a connection
func (m *ManagerImpl) Start() error {
if !m.lifecycle.BeginInitialization() {
return ErrAlreadyRunning
}
m.triggerConnectionFlow()
return nil
}
// Stop method stops the sse client and it's status monitoring goroutine
func (m *ManagerImpl) Stop() error {
if !m.lifecycle.BeginShutdown() {
return ErrNotRunning
}
m.statusTracker.NotifySSEShutdownExpected()
m.withRefreshTokenLock(func() {
if m.nextRefresh != nil {
m.nextRefresh.Stop()
}
})
m.StopWorkers()
m.sseClient.StopStreaming()
m.lifecycle.AwaitShutdownComplete()
return nil
}
// StartWorkers start the splits & segments workers
func (m *ManagerImpl) StartWorkers() {
m.processor.StartWorkers()
}
// StopWorkers stops the splits & segments workers
func (m *ManagerImpl) StopWorkers() {
m.processor.StopWorkers()
}
func (m *ManagerImpl) performAuthentication() (*dtos.Token, *int64) {
token, err := m.authAPI.Authenticate()
if err != nil {
if errType, ok := err.(dtos.HTTPError); ok {
if errType.Code >= http.StatusInternalServerError {
m.logger.Error(fmt.Sprintf("Error authenticating: %s", err.Error()))
return nil, common.Int64Ref(StatusRetryableError)
}
return nil, common.Int64Ref(StatusNonRetryableError) // 400, 401, etc
}
// Not an HTTP eerror, most likely a tcp/bad connection. Should retry
return nil, common.Int64Ref(StatusRetryableError)
}
if !token.PushEnabled {
return nil, common.Int64Ref(StatusNonRetryableError)
}
return token, nil
}
func (m *ManagerImpl) eventHandler(e sse.IncomingMessage) {
newStatus, err := m.parser.ParseAndForward(e)
if newStatus != nil {
m.feedback <- *newStatus
} else if err != nil {
m.logger.Error("error parsing message: ", err)
m.logger.Debug("failed message: ", e)
m.feedback <- StatusRetryableError
}
}
func (m *ManagerImpl) triggerConnectionFlow() {
token, status := m.performAuthentication()
if status != nil {
m.lifecycle.AbnormalShutdown()
defer m.lifecycle.ShutdownComplete()
m.feedback <- *status
return
}
tokenList, err := token.ChannelList()
if err != nil {
m.logger.Error("error parsing channel list: ", err)
m.lifecycle.AbnormalShutdown()
defer m.lifecycle.ShutdownComplete()
m.feedback <- StatusRetryableError
return
}
m.statusTracker.Reset()
sseStatus := make(chan int, 100)
m.sseClient.ConnectStreaming(token.Token, sseStatus, tokenList, m.eventHandler)
go func() {
defer m.lifecycle.ShutdownComplete()
if !m.lifecycle.InitializationComplete() {
return
}
for {
message := <-sseStatus
switch message {
case sse.StatusFirstEventOk:
when, err := token.CalculateNextTokenExpiration()
if err != nil || when <= 0 {
m.logger.Warning("Failed to calculate next token expiration time. Defaulting to 50 minutes")
when = 50 * time.Minute
}
m.withRefreshTokenLock(func() {
m.nextRefresh = time.AfterFunc(when, func() {
m.logger.Info("Refreshing SSE auth token.")
m.Stop()
m.Start()
})
})
m.feedback <- StatusUp
case sse.StatusConnectionFailed:
m.lifecycle.AbnormalShutdown()
m.logger.Error("SSE Connection failed")
m.feedback <- StatusRetryableError
return
case sse.StatusDisconnected:
m.logger.Debug("propagating sse disconnection event")
status := m.statusTracker.HandleDisconnection()
if status != nil { // connection ended unexpectedly
m.lifecycle.AbnormalShutdown()
m.feedback <- *status
}
return
case sse.StatusUnderlyingClientInUse:
m.lifecycle.AbnormalShutdown()
m.logger.Error("unexpected error in streaming. Switching to polling")
m.feedback <- StatusNonRetryableError
return
}
}
}()
}
func (m *ManagerImpl) withRefreshTokenLock(f func()) {
m.refreshTokenMutex.Lock()
defer m.refreshTokenMutex.Unlock()
f()
}

View File

@@ -0,0 +1,399 @@
package push
import (
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/splitio/go-split-commons/v3/service/api/sse"
"github.com/splitio/go-toolkit/v4/logging"
)
// SSE event type constants
const (
SSEEventTypeSync = "sync"
SSEEventTypeMessage = "message"
SSEEventTypeError = "error"
)
// Message type constants
const (
MessageTypeUpdate = iota
MessageTypeControl
MessageTypeOccupancy
)
// Update type constants
const (
UpdateTypeSplitChange = "SPLIT_UPDATE"
UpdateTypeSplitKill = "SPLIT_KILL"
UpdateTypeSegmentChange = "SEGMENT_UPDATE"
UpdateTypeContol = "CONTROL"
)
// Control type constants
const (
ControlTypeStreamingEnabled = "STREAMING_ENABLED"
ControlTypeStreamingPaused = "STREAMING_PAUSED"
ControlTypeStreamingDisabled = "STREAMING_DISABLED"
)
const (
occupancuName = "[meta]occupancy"
occupancyPrefix = "[?occupancy=metrics.publishers]"
)
// ErrEmptyEvent indicates an event without message and event fields
var ErrEmptyEvent = errors.New("empty incoming event")
// NotificationParser interface
type NotificationParser interface {
ParseAndForward(sse.IncomingMessage) (*int64, error)
}
// NotificationParserImpl implementas the NotificationParser interface
type NotificationParserImpl struct {
logger logging.LoggerInterface
onSplitUpdate func(*SplitChangeUpdate) error
onSplitKill func(*SplitKillUpdate) error
onSegmentUpdate func(*SegmentChangeUpdate) error
onControlUpdate func(*ControlUpdate) *int64
onOccupancyMesage func(*OccupancyMessage) *int64
onAblyError func(*AblyError) *int64
}
// ParseAndForward accepts an incoming RAW event and returns a properly parsed & typed event
func (p *NotificationParserImpl) ParseAndForward(raw sse.IncomingMessage) (*int64, error) {
if raw.Event() == "" {
if raw.ID() == "" {
return nil, ErrEmptyEvent
}
// If it has ID its a sync event, which we're not using not. Ignore.
p.logger.Debug("Ignoring sync event")
return nil, nil
}
data := genericData{}
err := json.Unmarshal([]byte(raw.Data()), &data)
if err != nil {
return nil, fmt.Errorf("error parsing JSON: %w", err)
}
switch raw.Event() {
case SSEEventTypeError:
return p.parseError(&data)
case SSEEventTypeMessage:
return p.parseMessage(&data)
}
return nil, nil
}
func (p *NotificationParserImpl) parseError(data *genericData) (*int64, error) {
return p.onAblyError(&AblyError{
code: data.Code,
statusCode: data.StatusCode,
message: data.Message,
href: data.Href,
timestamp: data.Timestamp,
}), nil
}
func (p *NotificationParserImpl) parseMessage(data *genericData) (*int64, error) {
var nested genericMessageData
err := json.Unmarshal([]byte(data.Data), &nested)
if err != nil {
return nil, fmt.Errorf("error parsing message nested json data: %w", err)
}
if data.Name == occupancuName {
return p.onOccupancyMesage(&OccupancyMessage{
BaseMessage: BaseMessage{
timestamp: data.Timestamp,
channel: data.Channel,
},
publishers: nested.Metrics.Publishers,
}), nil
}
return p.parseUpdate(data, &nested)
}
func (p *NotificationParserImpl) parseUpdate(data *genericData, nested *genericMessageData) (*int64, error) {
if data == nil || nested == nil {
return nil, errors.New("parseUpdate: data cannot be nil")
}
base := BaseUpdate{
BaseMessage: BaseMessage{timestamp: data.Timestamp, channel: data.Channel},
changeNumber: nested.ChangeNumber,
}
switch nested.Type {
case UpdateTypeSplitChange:
return nil, p.onSplitUpdate(&SplitChangeUpdate{BaseUpdate: base})
case UpdateTypeSplitKill:
return nil, p.onSplitKill(&SplitKillUpdate{BaseUpdate: base, splitName: nested.SplitName, defaultTreatment: nested.DefaultTreatment})
case UpdateTypeSegmentChange:
return nil, p.onSegmentUpdate(&SegmentChangeUpdate{BaseUpdate: base, segmentName: nested.SegmentName})
case UpdateTypeContol:
return p.onControlUpdate(&ControlUpdate{BaseMessage: base.BaseMessage, controlType: nested.ControlType}), nil
default:
// TODO: log full event in debug mode
return nil, fmt.Errorf("invalid update type: %s", nested.Type)
}
}
// Event basic interface
type Event interface {
fmt.Stringer
EventType() string
Timestamp() int64
}
// SSESyncEvent represents an SSE Sync event with only id (used for resuming connections)
type SSESyncEvent struct {
id string
timestamp int64
}
// EventType always returns SSEEventTypeSync for SSESyncEvents
func (e *SSESyncEvent) EventType() string { return SSEEventTypeSync }
// Timestamp returns the timestamp of the event parsing
func (e *SSESyncEvent) Timestamp() int64 { return e.timestamp }
// String returns the string represenation of the event
func (e *SSESyncEvent) String() string {
return fmt.Sprintf("SSESync(id=%s,timestamp=%d)", e.id, e.timestamp)
}
// AblyError struct
type AblyError struct {
code int
statusCode int
message string
href string
timestamp int64
}
// EventType always returns SSEEventTypeError for AblyError
func (a *AblyError) EventType() string { return SSEEventTypeError }
// Code returns the error code
func (a *AblyError) Code() int { return a.code }
// StatusCode returns the status code
func (a *AblyError) StatusCode() int { return a.statusCode }
// Message returns the error message
func (a *AblyError) Message() string { return a.message }
// Href returns the documentation link
func (a *AblyError) Href() string { return a.href }
// Timestamp returns the error timestamp
func (a *AblyError) Timestamp() int64 { return a.timestamp }
// IsRetryable returns whether the error is recoverable via a push subsystem restart
func (a *AblyError) IsRetryable() bool { return a.code >= 40140 && a.code <= 40149 }
// String returns the string representation of the ably error
func (a *AblyError) String() string {
return fmt.Sprintf("AblyError(code=%d,statusCode=%d,message=%s,timestamp=%d,isRetryable=%t)",
a.code, a.statusCode, a.message, a.timestamp, a.IsRetryable())
}
// Message basic interface
type Message interface {
Event
MessageType() int64
Channel() string
}
// BaseMessage contains the basic message-specific fields and methods
type BaseMessage struct {
timestamp int64
channel string
}
// EventType always returns SSEEventTypeMessage for BaseMessage and embedding types
func (m *BaseMessage) EventType() string { return SSEEventTypeMessage }
// Timestamp returns the timestamp of the message reception
func (m *BaseMessage) Timestamp() int64 { return m.timestamp }
// Channel returns which channel the message was received in
func (m *BaseMessage) Channel() string { return m.channel }
// OccupancyMessage contains fields & methods related to ocupancy messages
type OccupancyMessage struct {
BaseMessage
publishers int64
}
// MessageType always returns MessageTypeOccupancy for Occupancy messages
func (o *OccupancyMessage) MessageType() int64 { return MessageTypeOccupancy }
// ChannelWithoutPrefix returns the original channel namem without the metadata prefix
func (o *OccupancyMessage) ChannelWithoutPrefix() string {
return strings.Replace(o.Channel(), occupancyPrefix, "", 1)
}
// Publishers returbs the amount of publishers in the current channel
func (o *OccupancyMessage) Publishers() int64 {
return o.publishers
}
// Strings returns the string representation of an occupancy message
func (o *OccupancyMessage) String() string {
return fmt.Sprintf("Occupancy(channel=%s,publishers=%d,timestamp=%d)",
o.Channel(), o.publishers, o.Timestamp())
}
// Update basic interface
type Update interface {
Message
UpdateType() string
ChangeNumber() int64
}
// BaseUpdate contains fields & methods related to update-based messages
type BaseUpdate struct {
BaseMessage
changeNumber int64
}
// MessageType alwats returns MessageType for Update messages
func (b *BaseUpdate) MessageType() int64 { return MessageTypeUpdate }
// ChangeNumber returns the changeNumber of the update
func (b *BaseUpdate) ChangeNumber() int64 { return b.changeNumber }
// SplitChangeUpdate represents a SplitChange notification generated in the split servers
type SplitChangeUpdate struct {
BaseUpdate
}
// UpdateType always returns UpdateTypeSplitChange for SplitKillUpdate messages
func (u *SplitChangeUpdate) UpdateType() string { return UpdateTypeSplitChange }
// String returns the String representation of a split change notification
func (u *SplitChangeUpdate) String() string {
return fmt.Sprintf("SplitChange(channel=%s,changeNumber=%d,timestamp=%d)",
u.Channel(), u.ChangeNumber(), u.Timestamp())
}
// SplitKillUpdate represents a SplitKill notification generated in the split servers
type SplitKillUpdate struct {
BaseUpdate
splitName string
defaultTreatment string
}
// UpdateType always returns UpdateTypeSplitKill for SplitKillUpdate messages
func (u *SplitKillUpdate) UpdateType() string { return UpdateTypeSplitKill }
// SplitName returns the name of the killed split
func (u *SplitKillUpdate) SplitName() string { return u.splitName }
// DefaultTreatment returns the last default treatment seen in the split servers for this split
func (u *SplitKillUpdate) DefaultTreatment() string { return u.defaultTreatment }
// ToSplitChangeUpdate Maps this kill notification to a split change one
func (u *SplitKillUpdate) ToSplitChangeUpdate() *SplitChangeUpdate {
return &SplitChangeUpdate{BaseUpdate: u.BaseUpdate}
}
// String returns the string representation of this update
func (u *SplitKillUpdate) String() string {
return fmt.Sprintf("SplitKill(channel=%s,changeNumber=%d,splitName=%s,defaultTreatment=%s,timestamp=%d)",
u.Channel(), u.ChangeNumber(), u.SplitName(), u.DefaultTreatment(), u.Timestamp())
}
// SegmentChangeUpdate represents a segment change notification generated in the split servers.
type SegmentChangeUpdate struct {
BaseUpdate
segmentName string
}
// UpdateType is always UpdateTypeSegmentChange for Segmet Updates
func (u *SegmentChangeUpdate) UpdateType() string { return UpdateTypeSegmentChange }
// SegmentName returns the name of the updated segment
func (u *SegmentChangeUpdate) SegmentName() string { return u.segmentName }
// String returns the string representation of a segment update notification
func (u *SegmentChangeUpdate) String() string {
return fmt.Sprintf("SegmentChange(channel=%s,changeNumber=%d,segmentName=%s,timestamp=%d",
u.Channel(), u.ChangeNumber(), u.segmentName, u.Timestamp())
}
// ControlUpdate represents a control notification generated by the split push subsystem
type ControlUpdate struct {
BaseMessage
controlType string
}
// MessageType always returns MessageTypeControl for Control messages
func (u *ControlUpdate) MessageType() int64 { return MessageTypeControl }
// ControlType returns the type of control notification received
func (u *ControlUpdate) ControlType() string { return u.controlType }
// String returns a string representation of this notification
func (u *ControlUpdate) String() string {
return fmt.Sprintf("Control(channel=%s,type=%s,timestamp=%d)",
u.Channel(), u.controlType, u.Timestamp())
}
type genericData struct {
// Error associated data
Code int `json:"code"`
StatusCode int `json:"statusCode"`
Message string `json:"message"`
Href string `json:"href"`
ClientID string `json:"clientId"`
ID string `json:"id"`
Name string `json:"name"`
Timestamp int64 `json:"timestamp"`
Encoding string `json:"encoding"`
Channel string `json:"channel"`
Data string `json:"data"`
//"id":"tO4rXGE4CX:0:0","timestamp":1612897630627,"encoding":"json","channel":"[?occupancy=metrics.publishers]control_sec","data":"{\"metrics\":{\"publishers\":0}}","name":"[meta]occupancy"}
}
type metrics struct {
Publishers int64 `json:"publishers"`
}
type genericMessageData struct {
Metrics metrics `json:"metrics"`
Type string `json:"type"`
ChangeNumber int64 `json:"changeNumber"`
SplitName string `json:"splitName"`
DefaultTreatment string `json:"defaultTreatment"`
SegmentName string `json:"segmentName"`
ControlType string `json:"controlType"`
// {\"type\":\"SPLIT_UPDATE\",\"changeNumber\":1612909342671}"}
}
// Compile-type assertions of interface requirements
var _ Event = &AblyError{}
var _ Message = &OccupancyMessage{}
var _ Message = &SplitChangeUpdate{}
var _ Message = &SplitKillUpdate{}
var _ Message = &SegmentChangeUpdate{}
var _ Message = &ControlUpdate{}
var _ Update = &SplitChangeUpdate{}
var _ Update = &SplitKillUpdate{}
var _ Update = &SegmentChangeUpdate{}

View File

@@ -0,0 +1,107 @@
package push
import (
"errors"
"fmt"
"github.com/splitio/go-toolkit/v4/logging"
)
const (
splitQueueMinSize = 5000
segmentQueueMinSize = 5000
)
// Processor provides the interface for an update-message processor
type Processor interface {
ProcessSplitChangeUpdate(update *SplitChangeUpdate) error
ProcessSplitKillUpdate(update *SplitKillUpdate) error
ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error
StartWorkers()
StopWorkers()
}
// ProcessorImpl struct for notification processor
type ProcessorImpl struct {
segmentQueue chan SegmentChangeUpdate
splitQueue chan SplitChangeUpdate
splitWorker *SplitUpdateWorker
segmentWorker *SegmentUpdateWorker
synchronizer synchronizerInterface
logger logging.LoggerInterface
}
// NewProcessor creates new processor
func NewProcessor(
splitQueueSize int64,
segmentQueueSize int64,
synchronizer synchronizerInterface,
logger logging.LoggerInterface,
) (*ProcessorImpl, error) {
if segmentQueueSize < segmentQueueMinSize {
return nil, errors.New("Small size of segmentQueue")
}
if splitQueueSize < splitQueueMinSize {
return nil, errors.New("Small size of splitQueue")
}
splitQueue := make(chan SplitChangeUpdate, splitQueueSize)
splitWorker, err := NewSplitUpdateWorker(splitQueue, synchronizer, logger)
if err != nil {
return nil, fmt.Errorf("error instantiating split worker: %w", err)
}
segmentQueue := make(chan SegmentChangeUpdate, segmentQueueSize)
segmentWorker, err := NewSegmentUpdateWorker(segmentQueue, synchronizer, logger)
if err != nil {
return nil, fmt.Errorf("error instantiating split worker: %w", err)
}
return &ProcessorImpl{
splitQueue: splitQueue,
splitWorker: splitWorker,
segmentQueue: segmentQueue,
segmentWorker: segmentWorker,
synchronizer: synchronizer,
logger: logger,
}, nil
}
// ProcessSplitChangeUpdate accepts a split change notifications and schedules a fetch
func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *SplitChangeUpdate) error {
if update == nil {
return errors.New("split change update cannot be nil")
}
p.splitQueue <- *update
return nil
}
// ProcessSplitKillUpdate accepts a split kill notification, issues a local kill and schedules a fetch
func (p *ProcessorImpl) ProcessSplitKillUpdate(update *SplitKillUpdate) error {
if update == nil {
return errors.New("split change update cannot be nil")
}
p.synchronizer.LocalKill(update.SplitName(), update.DefaultTreatment(), update.ChangeNumber())
return p.ProcessSplitChangeUpdate(update.ToSplitChangeUpdate())
}
// ProcessSegmentChangeUpdate accepts a segment change notification and schedules a fetch
func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error {
if update == nil {
return errors.New("split change update cannot be nil")
}
p.segmentQueue <- *update
return nil
}
// StartWorkers enables split & segments workers
func (p *ProcessorImpl) StartWorkers() {
p.splitWorker.Start()
p.segmentWorker.Start()
}
// StopWorkers pauses split & segments workers
func (p *ProcessorImpl) StopWorkers() {
p.splitWorker.Stop()
p.segmentWorker.Stop()
}

View File

@@ -0,0 +1,82 @@
package push
import (
"errors"
"fmt"
"sync/atomic"
"github.com/splitio/go-toolkit/v4/common"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/struct/traits/lifecycle"
)
// SegmentUpdateWorker struct
type SegmentUpdateWorker struct {
segmentQueue chan SegmentChangeUpdate
sync synchronizerInterface
logger logging.LoggerInterface
lifecycle lifecycle.Manager
}
// NewSegmentUpdateWorker creates SegmentUpdateWorker
func NewSegmentUpdateWorker(
segmentQueue chan SegmentChangeUpdate,
synchronizer synchronizerInterface,
logger logging.LoggerInterface,
) (*SegmentUpdateWorker, error) {
if cap(segmentQueue) < 5000 {
return nil, errors.New("")
}
running := atomic.Value{}
running.Store(false)
worker := &SegmentUpdateWorker{
segmentQueue: segmentQueue,
sync: synchronizer,
logger: logger,
}
worker.lifecycle.Setup()
return worker, nil
}
// Start starts worker
func (s *SegmentUpdateWorker) Start() {
if !s.lifecycle.BeginInitialization() {
s.logger.Info("Segment worker is already running")
return
}
go func() {
if !s.lifecycle.InitializationComplete() {
return
}
defer s.lifecycle.ShutdownComplete()
for {
select {
case segmentUpdate := <-s.segmentQueue:
s.logger.Debug("Received Segment update and proceding to perform fetch")
s.logger.Debug(fmt.Sprintf("SegmentName: %s\nChangeNumber: %d", segmentUpdate.SegmentName(), segmentUpdate.ChangeNumber()))
err := s.sync.SynchronizeSegment(segmentUpdate.SegmentName(), common.Int64Ref(segmentUpdate.ChangeNumber()), true)
if err != nil {
s.logger.Error(err)
}
case <-s.lifecycle.ShutdownRequested():
return
}
}
}()
}
// Stop stops worker
func (s *SegmentUpdateWorker) Stop() {
if !s.lifecycle.BeginShutdown() {
s.logger.Debug("Split worker not runnning. Ignoring.")
return
}
s.lifecycle.AwaitShutdownComplete()
}
// IsRunning indicates if worker is running or not
func (s *SegmentUpdateWorker) IsRunning() bool {
return s.lifecycle.IsRunning()
}

View File

@@ -0,0 +1,80 @@
package push
import (
"errors"
"fmt"
"github.com/splitio/go-toolkit/v4/common"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/struct/traits/lifecycle"
)
// SplitUpdateWorker struct
type SplitUpdateWorker struct {
splitQueue chan SplitChangeUpdate
sync synchronizerInterface
logger logging.LoggerInterface
lifecycle lifecycle.Manager
}
// NewSplitUpdateWorker creates SplitUpdateWorker
func NewSplitUpdateWorker(
splitQueue chan SplitChangeUpdate,
synchronizer synchronizerInterface,
logger logging.LoggerInterface,
) (*SplitUpdateWorker, error) {
if cap(splitQueue) < 5000 {
return nil, errors.New("")
}
worker := &SplitUpdateWorker{
splitQueue: splitQueue,
sync: synchronizer,
logger: logger,
}
worker.lifecycle.Setup()
return worker, nil
}
// Start starts worker
func (s *SplitUpdateWorker) Start() {
if !s.lifecycle.BeginInitialization() {
s.logger.Info("Split worker is already running")
return
}
s.logger.Debug("Started SplitUpdateWorker")
go func() {
defer s.lifecycle.ShutdownComplete()
if !s.lifecycle.InitializationComplete() {
return
}
for {
select {
case splitUpdate := <-s.splitQueue:
s.logger.Debug("Received Split update and proceding to perform fetch")
s.logger.Debug(fmt.Sprintf("ChangeNumber: %d", splitUpdate.ChangeNumber()))
err := s.sync.SynchronizeSplits(common.Int64Ref(splitUpdate.ChangeNumber()), true)
if err != nil {
s.logger.Error(err)
}
case <-s.lifecycle.ShutdownRequested():
return
}
}
}()
}
// Stop stops worker
func (s *SplitUpdateWorker) Stop() {
if !s.lifecycle.BeginShutdown() {
s.logger.Debug("Split worker not runnning. Ignoring.")
return
}
s.lifecycle.AwaitShutdownComplete()
}
// IsRunning indicates if worker is running or not
func (s *SplitUpdateWorker) IsRunning() bool {
return s.lifecycle.IsRunning()
}

View File

@@ -0,0 +1,158 @@
package push
import (
"fmt"
"sync"
"github.com/splitio/go-toolkit/v4/common"
"github.com/splitio/go-toolkit/v4/logging"
)
// StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.
type StatusTracker interface {
HandleOccupancy(*OccupancyMessage) *int64
HandleControl(*ControlUpdate) *int64
HandleAblyError(*AblyError) *int64
HandleDisconnection() *int64
NotifySSEShutdownExpected()
Reset()
}
// StatusTrackerImpl is a concrete implementation of the StatusTracker interface
type StatusTrackerImpl struct {
logger logging.LoggerInterface
mutex sync.Mutex
occupancy map[string]int64
lastControlTimestamp int64
lastOccupancyTimestamp int64
lastControlMessage string
lastStatusPropagated int64
shutdownExpected bool
}
// NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client
func (p *StatusTrackerImpl) NotifySSEShutdownExpected() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.shutdownExpected = true
}
// Reset should be called on initialization and when the a new connection is being established (to start from scratch)
func (p *StatusTrackerImpl) Reset() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.occupancy = map[string]int64{"control_pri": 2, "control_sec": 2}
p.lastControlMessage = ControlTypeStreamingEnabled
p.lastStatusPropagated = StatusUp
p.shutdownExpected = false
}
// HandleOccupancy should be called for every occupancy notification received
func (p *StatusTrackerImpl) HandleOccupancy(message *OccupancyMessage) (newStatus *int64) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.shutdownExpected {
return nil // we don't care about occupancy if we're disconnecting
}
channel := message.ChannelWithoutPrefix()
if _, ok := p.occupancy[channel]; !ok {
p.logger.Warning(fmt.Sprintf("received occupancy on non-registered channel '%s'. Ignoring", channel))
return nil
}
p.lastOccupancyTimestamp = message.Timestamp()
p.occupancy[channel] = message.Publishers()
return p.updateStatus()
}
// HandleAblyError should be called whenever an ably error is received
func (p *StatusTrackerImpl) HandleAblyError(errorEvent *AblyError) (newStatus *int64) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.shutdownExpected {
return nil // we don't care about occupancy if we're disconnecting
}
// Regardless of whether the error is retryable or not, we're going to close the connection
p.shutdownExpected = true
if errorEvent.IsRetryable() {
p.logger.Info("Received retryable error message. Restarting SSE connection with backoff")
return p.propagateStatus(StatusRetryableError)
}
p.logger.Info("Received non-retryable error message. Disabling streaming")
return p.propagateStatus(StatusNonRetryableError)
}
// HandleControl should be called whenever a control notification is received
func (p *StatusTrackerImpl) HandleControl(controlUpdate *ControlUpdate) *int64 {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.shutdownExpected {
return nil // we don't care about occupancy if we're disconnecting
}
if p.lastControlTimestamp > controlUpdate.timestamp {
p.logger.Warning("Received an old control update. Ignoring")
return nil
}
p.lastControlMessage = controlUpdate.controlType
p.lastControlTimestamp = controlUpdate.timestamp
return p.updateStatus()
}
// HandleDisconnection should be called whenver the SSE client gets disconnected
func (p *StatusTrackerImpl) HandleDisconnection() *int64 {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.shutdownExpected {
return p.propagateStatus(StatusRetryableError)
}
return nil
}
// NewStatusTracker returns a new StatusTracker
func NewStatusTracker(logger logging.LoggerInterface) *StatusTrackerImpl {
tracker := &StatusTrackerImpl{logger: logger}
tracker.Reset()
return tracker
}
func (p *StatusTrackerImpl) occupancyOk() bool {
for _, v := range p.occupancy {
if v > 0 {
return true
}
}
return false
}
func (p *StatusTrackerImpl) updateStatus() *int64 {
if p.lastStatusPropagated == StatusUp {
if !p.occupancyOk() || p.lastControlMessage == ControlTypeStreamingPaused {
return p.propagateStatus(StatusDown)
}
if p.lastControlMessage == ControlTypeStreamingDisabled {
return p.propagateStatus(StatusNonRetryableError)
}
}
if p.lastStatusPropagated == StatusDown {
if p.occupancyOk() && p.lastControlMessage == ControlTypeStreamingEnabled {
return p.propagateStatus(StatusUp)
}
if p.lastControlMessage == ControlTypeStreamingDisabled {
return p.propagateStatus(StatusNonRetryableError)
}
}
return nil
}
func (p *StatusTrackerImpl) propagateStatus(newStatus int64) *int64 {
p.lastStatusPropagated = newStatus
return common.Int64Ref(newStatus)
}
var _ StatusTracker = &StatusTrackerImpl{}

View File

@@ -3,9 +3,9 @@ package api
import (
"encoding/json"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
// AuthAPIClient struct is responsible for authenticating client for push services
@@ -24,7 +24,7 @@ func NewAuthAPIClient(apikey string, cfg conf.AdvancedConfig, logger logging.Log
// Authenticate performs authentication for push services
func (a *AuthAPIClient) Authenticate() (*dtos.Token, error) {
raw, err := a.client.Get("/api/auth")
raw, err := a.client.Get("/api/auth", map[string]string{CacheControlHeader: CacheControlNoCache})
if err != nil {
a.logger.Error("Error while authenticating for streaming", err)
return nil, err

View File

@@ -9,14 +9,20 @@ import (
"net/http"
"time"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
// Cache control header constants
const (
CacheControlHeader = "Cache-Control"
CacheControlNoCache = "no-cache"
)
// Client interface for HTTPClient
type Client interface {
Get(service string) ([]byte, error)
Get(service string, headers map[string]string) ([]byte, error)
Post(service string, body []byte, headers map[string]string) error
}
@@ -51,7 +57,7 @@ func NewHTTPClient(
}
// Get method is a get call to an url
func (c *HTTPClient) Get(service string) ([]byte, error) {
func (c *HTTPClient) Get(service string, headers map[string]string) ([]byte, error) {
serviceURL := c.url + service
c.logger.Debug("[GET] ", serviceURL)
req, _ := http.NewRequest("GET", serviceURL, nil)
@@ -60,15 +66,18 @@ func (c *HTTPClient) Get(service string) ([]byte, error) {
c.logger.Debug("Authorization [ApiKey]: ", logging.ObfuscateAPIKey(authorization))
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("SplitSDKVersion", c.metadata.SDKVersion)
req.Header.Add("SplitSDKMachineName", c.metadata.MachineName)
req.Header.Add("SplitSDKMachineIP", c.metadata.MachineIP)
for headerName, headerValue := range headers {
req.Header.Add(headerName, headerValue)
}
c.logger.Debug(fmt.Sprintf("Headers: %v", req.Header))
req.Header.Add("Authorization", "Bearer "+authorization)
req.Header.Add("SplitSDKVersion", c.metadata.SDKVersion)
req.Header.Add("SplitSDKMachineName", c.metadata.MachineName)
req.Header.Add("SplitSDKMachineIP", c.metadata.MachineIP)
resp, err := c.httpClient.Do(req)
if err != nil {
c.logger.Error("Error requesting data to API: ", req.URL.String(), err.Error())
@@ -80,7 +89,10 @@ func (c *HTTPClient) Get(service string) ([]byte, error) {
var reader io.ReadCloser
switch resp.Header.Get("Content-Encoding") {
case "gzip":
reader, _ = gzip.NewReader(resp.Body)
reader, err = gzip.NewReader(resp.Body)
if err != nil {
return nil, fmt.Errorf("error parsing gzip resopnse body: %w", err)
}
defer reader.Close()
default:
reader = resp.Body

View File

@@ -5,9 +5,9 @@ import (
"encoding/json"
"strconv"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
type httpFetcherBase struct {
@@ -15,7 +15,7 @@ type httpFetcherBase struct {
logger logging.LoggerInterface
}
func (h *httpFetcherBase) fetchRaw(url string, since int64) ([]byte, error) {
func (h *httpFetcherBase) fetchRaw(url string, since int64, requestNoCache bool) ([]byte, error) {
var bufferQuery bytes.Buffer
bufferQuery.WriteString(url)
@@ -23,7 +23,12 @@ func (h *httpFetcherBase) fetchRaw(url string, since int64) ([]byte, error) {
bufferQuery.WriteString("?since=")
bufferQuery.WriteString(strconv.FormatInt(since, 10))
}
data, err := h.client.Get(bufferQuery.String())
var extraHeaders map[string]string
if requestNoCache {
extraHeaders = map[string]string{CacheControlHeader: CacheControlNoCache}
}
data, err := h.client.Get(bufferQuery.String(), extraHeaders)
if err != nil {
return nil, err
}
@@ -51,8 +56,8 @@ func NewHTTPSplitFetcher(
}
// Fetch makes an http call to the split backend and returns the list of updated splits
func (f *HTTPSplitFetcher) Fetch(since int64) (*dtos.SplitChangesDTO, error) {
data, err := f.fetchRaw("/splitChanges", since)
func (f *HTTPSplitFetcher) Fetch(since int64, requestNoCache bool) (*dtos.SplitChangesDTO, error) {
data, err := f.fetchRaw("/splitChanges", since, requestNoCache)
if err != nil {
f.logger.Error("Error fetching split changes ", err)
return nil, err
@@ -89,12 +94,12 @@ func NewHTTPSegmentFetcher(
}
// Fetch issues a GET request to the split backend and returns the contents of a particular segment
func (f *HTTPSegmentFetcher) Fetch(segmentName string, since int64) (*dtos.SegmentChangesDTO, error) {
func (f *HTTPSegmentFetcher) Fetch(segmentName string, since int64, requestNoCache bool) (*dtos.SegmentChangesDTO, error) {
var bufferQuery bytes.Buffer
bufferQuery.WriteString("/segmentChanges/")
bufferQuery.WriteString(segmentName)
data, err := f.fetchRaw(bufferQuery.String(), since)
data, err := f.fetchRaw(bufferQuery.String(), since, requestNoCache)
if err != nil {
f.logger.Error(err.Error())
return nil, err

View File

@@ -3,9 +3,9 @@ package api
import (
"encoding/json"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
type httpRecorderBase struct {

View File

@@ -0,0 +1,123 @@
package sse
import (
"errors"
"strings"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/sse"
"github.com/splitio/go-toolkit/v4/struct/traits/lifecycle"
gtSync "github.com/splitio/go-toolkit/v4/sync"
)
const (
version = "1.1"
keepAlive = 70
)
// StreamingClient interface
type StreamingClient interface {
ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage))
StopStreaming()
IsRunning() bool
}
// StreamingClientImpl struct
type StreamingClientImpl struct {
sseClient *sse.Client
logger logging.LoggerInterface
lifecycle lifecycle.Manager
}
// Status constants
const (
StatusConnectionFailed = iota
StatusUnderlyingClientInUse
StatusFirstEventOk
StatusDisconnected
)
// IncomingMessage is an alias of sse.RawEvent
type IncomingMessage = sse.RawEvent
// NewStreamingClient creates new SSE Client
func NewStreamingClient(cfg *conf.AdvancedConfig, logger logging.LoggerInterface) *StreamingClientImpl {
sseClient, _ := sse.NewClient(cfg.StreamingServiceURL, keepAlive, logger)
client := &StreamingClientImpl{
sseClient: sseClient,
logger: logger,
}
client.lifecycle.Setup()
return client
}
// ConnectStreaming connects to streaming
func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) {
if !s.lifecycle.BeginInitialization() {
s.logger.Info("Connection is already in process/running. Ignoring")
return
}
params := make(map[string]string)
params["channels"] = strings.Join(append(channelList), ",")
params["accessToken"] = token
params["v"] = version
go func() {
defer s.lifecycle.ShutdownComplete()
if !s.lifecycle.InitializationComplete() {
return
}
firstEventReceived := gtSync.NewAtomicBool(false)
out := s.sseClient.Do(params, func(m IncomingMessage) {
if firstEventReceived.TestAndSet() && !m.IsError() {
streamingStatus <- StatusFirstEventOk
}
handleIncomingMessage(m)
})
if out == nil { // all good
streamingStatus <- StatusDisconnected
return
}
// Something didn'g go as expected
s.lifecycle.AbnormalShutdown()
asConnectionFailedError := &sse.ErrConnectionFailed{}
if errors.As(out, &asConnectionFailedError) {
streamingStatus <- StatusConnectionFailed
return
}
switch out {
case sse.ErrNotIdle:
// If this happens we have a bug
streamingStatus <- StatusUnderlyingClientInUse
case sse.ErrReadingStream:
streamingStatus <- StatusDisconnected
case sse.ErrTimeout:
streamingStatus <- StatusDisconnected
default:
}
}()
}
// StopStreaming stops streaming
func (s *StreamingClientImpl) StopStreaming() {
if !s.lifecycle.BeginShutdown() {
s.logger.Info("SSE client wrapper not running. Ignoring")
return
}
s.sseClient.Shutdown(true)
s.lifecycle.AwaitShutdownComplete()
s.logger.Info("Stopped streaming")
}
// IsRunning returns true if the client is running
func (s *StreamingClientImpl) IsRunning() bool {
return s.lifecycle.IsRunning()
}

View File

@@ -1,7 +1,7 @@
package service
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v3/dtos"
)
// AuthClient inteface to be implemneted by AuthClient
@@ -11,12 +11,12 @@ type AuthClient interface {
// SplitFetcher interface to be implemented by Split Fetchers
type SplitFetcher interface {
Fetch(changeNumber int64) (*dtos.SplitChangesDTO, error)
Fetch(changeNumber int64, requstNoCache bool) (*dtos.SplitChangesDTO, error)
}
// SegmentFetcher interface to be implemented by Split Fetchers
type SegmentFetcher interface {
Fetch(name string, changeNumber int64) (*dtos.SegmentChangesDTO, error)
Fetch(name string, changeNumber int64, requestNoCace bool) (*dtos.SegmentChangesDTO, error)
}
// ImpressionsRecorder interface to be implemented by Impressions loggers

View File

@@ -8,8 +8,9 @@ import (
"runtime/debug"
"strings"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-toolkit/v4/logging"
yaml "gopkg.in/yaml.v2"
)
@@ -222,7 +223,7 @@ func parseSplitsYAML(data string) (d []dtos.SplitDTO) {
}
// Fetch parses the file and returns the appropriate structures
func (s *FileSplitFetcher) Fetch(changeNumber int64) (*dtos.SplitChangesDTO, error) {
func (s *FileSplitFetcher) Fetch(changeNumber int64, _ bool) (*dtos.SplitChangesDTO, error) {
fileContents, err := ioutil.ReadFile(s.splitFile)
if err != nil {
return nil, err
@@ -256,3 +257,5 @@ func (s *FileSplitFetcher) Fetch(changeNumber int64) (*dtos.SplitChangesDTO, err
Till: till,
}, nil
}
var _ service.SplitFetcher = &FileSplitFetcher{}

View File

@@ -1,10 +1,10 @@
package service
import (
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service/api"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service/api"
"github.com/splitio/go-toolkit/v4/logging"
)
// SplitAPI struct for fetchers and recorders

View File

@@ -1,8 +1,8 @@
package storage
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// SplitStorageProducer should be implemented by structs that offer writing splits in storage

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strings"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v4/logging"
)
// MetricWrapper struct

View File

@@ -1,6 +1,6 @@
package mocks
import "github.com/splitio/go-split-commons/v2/dtos"
import "github.com/splitio/go-split-commons/v3/dtos"
// MockEventStorage is a mocked implementation of Event Storage
type MockEventStorage struct {

View File

@@ -1,6 +1,6 @@
package mocks
import "github.com/splitio/go-split-commons/v2/dtos"
import "github.com/splitio/go-split-commons/v3/dtos"
// MockImpressionStorage is a mocked implementation of Impression Storage
type MockImpressionStorage struct {

View File

@@ -1,6 +1,6 @@
package mocks
import "github.com/splitio/go-split-commons/v2/dtos"
import "github.com/splitio/go-split-commons/v3/dtos"
// MockMetricStorage is a mocked implementation of Metric Storage
type MockMetricStorage struct {

View File

@@ -1,6 +1,6 @@
package mocks
import "github.com/splitio/go-toolkit/v3/datastructures/set"
import "github.com/splitio/go-toolkit/v4/datastructures/set"
// MockSegmentStorage is a mocked implementation of Segment Storage
type MockSegmentStorage struct {

View File

@@ -1,8 +1,8 @@
package mocks
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// MockSplitStorage is a mocked implementation of Split Storage

View File

@@ -3,7 +3,7 @@ package mutexmap
import (
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v3/dtos"
)
// MMMetricsStorage contains an in-memory implementation of Metrics storage

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"sync"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// MMSegmentStorage contains is an in-memory implementation of segment storage

View File

@@ -3,8 +3,8 @@ package mutexmap
import (
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/datastructures/set"
)
// MMSplitStorage struct contains is an in-memory implementation of split storage

View File

@@ -5,8 +5,8 @@ import (
"fmt"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
// MaxAccumulatedBytes is the maximum size to accumulate in events before flush (in bytes)

View File

@@ -4,8 +4,8 @@ import (
"container/list"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
)
// NewMQImpressionsStorage returns an instance of MQEventsStorage

View File

@@ -5,10 +5,10 @@ import (
"math"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/queuecache"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/queuecache"
"github.com/splitio/go-toolkit/v4/redis"
)
// EventsStorage redis implementation of EventsStorage interface

View File

@@ -5,9 +5,9 @@ import (
"sync"
"time"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
)
const impressionsTTLRefresh = time.Duration(3600) * time.Second

View File

@@ -7,9 +7,9 @@ import (
"strings"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
)
// MetricsStorage is a redis-based implementation of split storage

View File

@@ -4,8 +4,8 @@ import (
"errors"
"strings"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
)
// ErrorHashNotPresent constant

View File

@@ -6,10 +6,10 @@ import (
"strings"
"time"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-toolkit/v3/redis/helpers"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
"github.com/splitio/go-toolkit/v4/redis/helpers"
)
// NewRedisClient returns a new Prefixed Redis Client

View File

@@ -6,9 +6,9 @@ import (
"strings"
"sync"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-toolkit/v4/datastructures/set"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
)
// SegmentStorage is a redis implementation of a storage for segments

View File

@@ -8,10 +8,10 @@ import (
"strings"
"sync"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-toolkit/v3/datastructures/set"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-toolkit/v3/redis"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-toolkit/v4/datastructures/set"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/redis"
)
// SplitStorage is a redis-based implementation of split storage

View File

@@ -0,0 +1,13 @@
package synchronizer
// Synchronizer interface for syncing data to and from splits servers
type Synchronizer interface {
SyncAll(requestNoCache bool) error
SynchronizeSplits(till *int64, requestNoCache bool) error
LocalKill(splitName string, defaultTreatment string, changeNumber int64)
SynchronizeSegment(segmentName string, till *int64, requestNoCache bool) error
StartPeriodicFetching()
StopPeriodicFetching()
StartPeriodicDataRecording()
StopPeriodicDataRecording()
}

View File

@@ -1,13 +1,13 @@
package synchronizer
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/storage"
storageMock "github.com/splitio/go-split-commons/v2/storage/mocks"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v2/tasks"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/storage"
storageMock "github.com/splitio/go-split-commons/v3/storage/mocks"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v3/tasks"
"github.com/splitio/go-toolkit/v4/logging"
)
// Local implements Local Synchronizer
@@ -47,8 +47,9 @@ func NewLocal(
}
// SyncAll syncs splits and segments
func (s *Local) SyncAll() error {
return s.workers.SplitFetcher.SynchronizeSplits(nil)
func (s *Local) SyncAll(requestNoCache bool) error {
_, err := s.workers.SplitFetcher.SynchronizeSplits(nil, requestNoCache)
return err
}
// StartPeriodicFetching starts periodic fetchers tasks
@@ -70,11 +71,16 @@ func (s *Local) StopPeriodicDataRecording() {
}
// SynchronizeSplits syncs splits
func (s *Local) SynchronizeSplits(till *int64) error {
return s.workers.SplitFetcher.SynchronizeSplits(till)
func (s *Local) SynchronizeSplits(till *int64, requestNoCache bool) error {
_, err := s.workers.SplitFetcher.SynchronizeSplits(nil, requestNoCache)
return err
}
// SynchronizeSegment syncs segment
func (s *Local) SynchronizeSegment(name string, till *int64) error {
func (s *Local) SynchronizeSegment(name string, till *int64, _ bool) error {
return nil
}
// LocalKill does nothing
func (s *Local) LocalKill(splitName string, defaultTreatment string, changeNumber int64) {
}

View File

@@ -0,0 +1,207 @@
package synchronizer
import (
"errors"
"sync/atomic"
"time"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/push"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-toolkit/v4/backoff"
"github.com/splitio/go-toolkit/v4/logging"
"github.com/splitio/go-toolkit/v4/struct/traits/lifecycle"
)
const (
// Ready represents ready
Ready = iota
// StreamingReady ready
StreamingReady
// Error represents some error in SSE streaming
Error
)
// Operation mode constants
const (
Streaming = iota
Polling
)
// Manager interface
type Manager interface {
Start()
Stop()
IsRunning() bool
}
// ManagerImpl struct
type ManagerImpl struct {
synchronizer Synchronizer
logger logging.LoggerInterface
config conf.AdvancedConfig
pushManager push.Manager
managerStatus chan int
streamingStatus chan int64
operationMode int32
lifecycle lifecycle.Manager
backoff backoff.Interface
}
// NewSynchronizerManager creates new sync manager
func NewSynchronizerManager(
synchronizer Synchronizer,
logger logging.LoggerInterface,
config conf.AdvancedConfig,
authClient service.AuthClient,
splitStorage storage.SplitStorage,
managerStatus chan int,
) (*ManagerImpl, error) {
if managerStatus == nil || cap(managerStatus) < 1 {
return nil, errors.New("Status channel cannot be nil nor having capacity")
}
manager := &ManagerImpl{
backoff: backoff.New(),
synchronizer: synchronizer,
logger: logger,
config: config,
managerStatus: managerStatus,
}
manager.lifecycle.Setup()
if config.StreamingEnabled {
streamingStatus := make(chan int64, 1000)
pushManager, err := push.NewManager(logger, synchronizer, &config, streamingStatus, authClient)
if err != nil {
return nil, err
}
manager.pushManager = pushManager
manager.streamingStatus = streamingStatus
}
return manager, nil
}
// IsRunning returns true if is in Streaming or Polling
func (s *ManagerImpl) IsRunning() bool {
return s.lifecycle.IsRunning()
}
// Start starts synchronization through Split
func (s *ManagerImpl) Start() {
if !s.lifecycle.BeginInitialization() {
s.logger.Info("Manager is already running, skipping start")
return
}
// It's safe to drain the channel here, since it's guaranteed that the manager status is "starting"
// push manager is still stopped
for len(s.managerStatus) > 0 {
<-s.managerStatus
}
err := s.synchronizer.SyncAll(false)
if err != nil {
defer s.lifecycle.ShutdownComplete()
s.managerStatus <- Error
return
}
if !s.lifecycle.InitializationComplete() {
defer s.lifecycle.ShutdownComplete()
return
}
s.logger.Debug("SyncAll Ready")
s.managerStatus <- Ready
s.synchronizer.StartPeriodicDataRecording()
if !s.config.StreamingEnabled {
s.logger.Info("SDK initialized in polling mode")
s.startPolling()
go func() { // create a goroutine that stops everything (the same way the streaming status watcher would)
<-s.lifecycle.ShutdownRequested()
s.stop()
}()
return
}
// Start streaming
s.logger.Info("SDK Initialized in streaming mode")
s.pushManager.Start()
go s.pushStatusWatcher()
}
func (s *ManagerImpl) stop() {
if s.pushManager != nil {
s.pushManager.Stop()
}
s.synchronizer.StopPeriodicFetching()
s.synchronizer.StopPeriodicDataRecording()
s.lifecycle.ShutdownComplete()
}
// Stop stop synchronizaation through Split
func (s *ManagerImpl) Stop() {
if !s.lifecycle.BeginShutdown() {
s.logger.Info("sync manager not yet running, skipping shutdown.")
return
}
s.logger.Info("Stopping all synchronization tasks")
s.lifecycle.AwaitShutdownComplete()
}
func (s *ManagerImpl) pushStatusWatcher() {
defer s.stop()
for {
select {
case <-s.lifecycle.ShutdownRequested():
return
case status := <-s.streamingStatus:
switch status {
case push.StatusUp:
s.stopPolling()
s.logger.Info("streaming up and running")
s.enableStreaming()
s.synchronizer.SyncAll(true)
case push.StatusDown:
s.logger.Info("streaming down, switchin to polling")
s.synchronizer.SyncAll(false)
s.pauseStreaming()
s.startPolling()
case push.StatusRetryableError:
howLong := s.backoff.Next()
s.logger.Error("retryable error in streaming subsystem. Switching to polling and retrying in ", howLong, " seconds")
s.pushManager.Stop()
s.synchronizer.SyncAll(false)
s.startPolling()
time.Sleep(howLong)
s.pushManager.Start()
case push.StatusNonRetryableError:
s.logger.Error("non retryable error in streaming subsystem. Switching to polling until next SDK initialization")
s.pushManager.Stop()
s.synchronizer.SyncAll(false)
s.startPolling()
}
}
}
}
func (s *ManagerImpl) startPolling() {
atomic.StoreInt32(&s.operationMode, Polling)
s.synchronizer.StartPeriodicFetching()
}
func (s *ManagerImpl) stopPolling() {
s.synchronizer.StopPeriodicFetching()
}
func (s *ManagerImpl) pauseStreaming() {
s.pushManager.StartWorkers()
}
func (s *ManagerImpl) enableStreaming() {
s.pushManager.StartWorkers()
atomic.StoreInt32(&s.operationMode, Streaming)
s.backoff.Reset()
}

View File

@@ -1,16 +1,16 @@
package synchronizer
import (
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/event"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/impression"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/impressionscount"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/metric"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v2/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v2/tasks"
"github.com/splitio/go-toolkit/v3/asynctask"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/event"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/impression"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/impressionscount"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/metric"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v3/synchronizer/worker/split"
"github.com/splitio/go-split-commons/v3/tasks"
"github.com/splitio/go-toolkit/v4/asynctask"
"github.com/splitio/go-toolkit/v4/logging"
)
// SplitTasks struct for tasks
@@ -25,8 +25,8 @@ type SplitTasks struct {
// Workers struct for workers
type Workers struct {
SplitFetcher split.SplitFetcher
SegmentFetcher segment.SegmentFetcher
SplitFetcher split.Updater
SegmentFetcher segment.Updater
TelemetryRecorder metric.MetricRecorder
ImpressionRecorder impression.ImpressionRecorder
EventRecorder event.EventRecorder
@@ -83,12 +83,12 @@ func (s *SynchronizerImpl) dataFlusher() {
}
// SyncAll syncs splits and segments
func (s *SynchronizerImpl) SyncAll() error {
err := s.workers.SplitFetcher.SynchronizeSplits(nil)
func (s *SynchronizerImpl) SyncAll(requestNoCache bool) error {
_, err := s.workers.SplitFetcher.SynchronizeSplits(nil, requestNoCache)
if err != nil {
return err
}
return s.workers.SegmentFetcher.SynchronizeSegments()
return s.workers.SegmentFetcher.SynchronizeSegments(requestNoCache)
}
// StartPeriodicFetching starts periodic fetchers tasks
@@ -148,11 +148,32 @@ func (s *SynchronizerImpl) StopPeriodicDataRecording() {
}
// SynchronizeSplits syncs splits
func (s *SynchronizerImpl) SynchronizeSplits(till *int64) error {
return s.workers.SplitFetcher.SynchronizeSplits(till)
func (s *SynchronizerImpl) SynchronizeSplits(till *int64, requstNoCache bool) error {
referencedSegments, err := s.workers.SplitFetcher.SynchronizeSplits(till, requstNoCache)
for _, segment := range s.filterCachedSegments(referencedSegments) {
go s.SynchronizeSegment(segment, nil, true) // send segment to workerpool (queue is bypassed)
}
return err
}
func (s *SynchronizerImpl) filterCachedSegments(segmentsReferenced []string) []string {
toRet := make([]string, 0, len(segmentsReferenced))
for _, name := range segmentsReferenced {
if !s.workers.SegmentFetcher.IsSegmentCached(name) {
toRet = append(toRet, name)
}
}
return toRet
}
// LocalKill locally kills a split
func (s *SynchronizerImpl) LocalKill(splitName string, defaultTreatment string, changeNumber int64) {
s.workers.SplitFetcher.LocalKill(splitName, defaultTreatment, changeNumber)
}
// SynchronizeSegment syncs segment
func (s *SynchronizerImpl) SynchronizeSegment(name string, till *int64) error {
return s.workers.SegmentFetcher.SynchronizeSegment(name, till)
func (s *SynchronizerImpl) SynchronizeSegment(name string, till *int64, requstNoCache bool) error {
return s.workers.SegmentFetcher.SynchronizeSegment(name, till, requstNoCache)
}
var _ Synchronizer = &SynchronizerImpl{}

View File

@@ -2,13 +2,14 @@ package event
import (
"errors"
"strconv"
"time"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v2/util"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-split-commons/v3/util"
"github.com/splitio/go-toolkit/v4/logging"
)
// RecorderSingle struct for event sync
@@ -54,7 +55,7 @@ func (e *RecorderSingle) SynchronizeEvents(bulkSize int64) error {
err = e.eventRecorder.Record(queuedEvents, e.metadata)
if err != nil {
if httpError, ok := err.(*dtos.HTTPError); ok {
e.metricsWrapper.StoreCounters(storage.PostEventsCounter, string(httpError.Code))
e.metricsWrapper.StoreCounters(storage.PostEventsCounter, strconv.Itoa(httpError.Code))
}
return err
}

View File

@@ -2,14 +2,15 @@ package impression
import (
"errors"
"strconv"
"time"
"github.com/splitio/go-split-commons/v2/conf"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v2/util"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/conf"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/storage"
"github.com/splitio/go-split-commons/v3/util"
"github.com/splitio/go-toolkit/v4/logging"
)
const (
@@ -95,7 +96,7 @@ func (i *RecorderSingle) SynchronizeImpressions(bulkSize int64) error {
err = i.impressionRecorder.Record(bulkImpressions, i.metadata, map[string]string{splitSDKImpressionsMode: i.mode})
if err != nil {
if httpError, ok := err.(*dtos.HTTPError); ok {
i.metricsWrapper.StoreCounters(storage.TestImpressionsCounter, string(httpError.Code))
i.metricsWrapper.StoreCounters(storage.TestImpressionsCounter, strconv.Itoa(httpError.Code))
}
return err
}

View File

@@ -1,10 +1,10 @@
package impressionscount
import (
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/provisional"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-toolkit/v3/logging"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/provisional"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-toolkit/v4/logging"
)
// RecorderSingle struct for impressionsCount sync

View File

@@ -3,9 +3,9 @@ package metric
import (
"errors"
"github.com/splitio/go-split-commons/v2/dtos"
"github.com/splitio/go-split-commons/v2/service"
"github.com/splitio/go-split-commons/v2/storage"
"github.com/splitio/go-split-commons/v3/dtos"
"github.com/splitio/go-split-commons/v3/service"
"github.com/splitio/go-split-commons/v3/storage"
)
// RecorderSingle struct for metric sync

View File

@@ -0,0 +1,9 @@
package segment
// Updater interface
type Updater interface {
SynchronizeSegment(name string, till *int64, requestNoCache bool) error
SynchronizeSegments(requestNoCache bool) error
SegmentNames() []interface{}
IsSegmentCached(segmentName string) bool
}

Some files were not shown because too many files have changed in this diff Show More