unistore: add small buffer of watched events (#100431)

* change log level

* Add a small buffer when watching events
This commit is contained in:
Georges Chaudy
2025-02-14 11:18:59 +01:00
committed by GitHub
parent f1b4678012
commit b1222be02e
2 changed files with 9 additions and 2 deletions

View File

@@ -218,7 +218,7 @@ func (b *bleveBackend) cleanOldIndexes(dir string, skip string) {
if err != nil {
b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err)
} else {
b.log.Error("Removed old index folder", "directory", fpath)
b.log.Info("Removed old index folder", "directory", fpath)
}
}
}

View File

@@ -26,6 +26,7 @@ import (
const tracePrefix = "sql.resource."
const defaultPollingInterval = 100 * time.Millisecond
const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream
type Backend interface {
resource.StorageBackend
@@ -37,6 +38,7 @@ type BackendOptions struct {
DBProvider db.DBProvider
Tracer trace.Tracer
PollingInterval time.Duration
WatchBufferSize int
}
func NewBackend(opts BackendOptions) (Backend, error) {
@@ -52,6 +54,9 @@ func NewBackend(opts BackendOptions) (Backend, error) {
if pollingInterval == 0 {
pollingInterval = defaultPollingInterval
}
if opts.WatchBufferSize == 0 {
opts.WatchBufferSize = defaultWatchBufferSize
}
return &backend{
done: ctx.Done(),
cancel: cancel,
@@ -59,6 +64,7 @@ func NewBackend(opts BackendOptions) (Backend, error) {
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
pollingInterval: pollingInterval,
watchBufferSize: opts.WatchBufferSize,
batchLock: &batchLock{running: make(map[string]bool)},
}, nil
}
@@ -83,6 +89,7 @@ type backend struct {
// watch streaming
//stream chan *resource.WatchEvent
pollingInterval time.Duration
watchBufferSize int
}
func (b *backend) Init(ctx context.Context) error {
@@ -706,7 +713,7 @@ func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.Writte
return nil, fmt.Errorf("watch, get latest resource version: %w", err)
}
// Start the poller
stream := make(chan *resource.WrittenEvent)
stream := make(chan *resource.WrittenEvent, b.watchBufferSize)
go b.poller(ctx, since, stream)
return stream, nil
}