From b1222be02e8f217836006946f155b2e2f45d8c68 Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Fri, 14 Feb 2025 11:18:59 +0100 Subject: [PATCH] unistore: add small buffer of watched events (#100431) * change log level * Add a small buffer when watching events --- pkg/storage/unified/search/bleve.go | 2 +- pkg/storage/unified/sql/backend.go | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go index 2b76bb97529..185e0388cd4 100644 --- a/pkg/storage/unified/search/bleve.go +++ b/pkg/storage/unified/search/bleve.go @@ -218,7 +218,7 @@ func (b *bleveBackend) cleanOldIndexes(dir string, skip string) { if err != nil { b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err) } else { - b.log.Error("Removed old index folder", "directory", fpath) + b.log.Info("Removed old index folder", "directory", fpath) } } } diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 7a7d9990d6a..777164b9e2b 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -26,6 +26,7 @@ import ( const tracePrefix = "sql.resource." const defaultPollingInterval = 100 * time.Millisecond +const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream type Backend interface { resource.StorageBackend @@ -37,6 +38,7 @@ type BackendOptions struct { DBProvider db.DBProvider Tracer trace.Tracer PollingInterval time.Duration + WatchBufferSize int } func NewBackend(opts BackendOptions) (Backend, error) { @@ -52,6 +54,9 @@ func NewBackend(opts BackendOptions) (Backend, error) { if pollingInterval == 0 { pollingInterval = defaultPollingInterval } + if opts.WatchBufferSize == 0 { + opts.WatchBufferSize = defaultWatchBufferSize + } return &backend{ done: ctx.Done(), cancel: cancel, @@ -59,6 +64,7 @@ func NewBackend(opts BackendOptions) (Backend, error) { tracer: opts.Tracer, dbProvider: opts.DBProvider, pollingInterval: pollingInterval, + watchBufferSize: opts.WatchBufferSize, batchLock: &batchLock{running: make(map[string]bool)}, }, nil } @@ -83,6 +89,7 @@ type backend struct { // watch streaming //stream chan *resource.WatchEvent pollingInterval time.Duration + watchBufferSize int } func (b *backend) Init(ctx context.Context) error { @@ -706,7 +713,7 @@ func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.Writte return nil, fmt.Errorf("watch, get latest resource version: %w", err) } // Start the poller - stream := make(chan *resource.WrittenEvent) + stream := make(chan *resource.WrittenEvent, b.watchBufferSize) go b.poller(ctx, since, stream) return stream, nil }