watch from head

This commit is contained in:
Georges Chaudy 2024-07-08 10:49:25 +02:00
parent 396979bcf6
commit 80bf285216
No known key found for this signature in database
GPG Key ID: 0EE887FFCA1DB6EF
2 changed files with 62 additions and 9 deletions

View File

@ -387,21 +387,30 @@ func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) (*
}
func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
if err := b.Init(); err != nil {
return nil, err
}
// Fetch the lastest RV
rows, err := b.sqlDB.QueryContext(ctx, `SELECT COALESCE(max("resource_version"), 0) FROM "resource";`)
if err != nil {
return nil, fmt.Errorf("fetch latest rv: %w", err)
}
since := int64(0)
if rows.Next() {
if err := rows.Scan(&since); err != nil {
return nil, fmt.Errorf("scan since resource version: %w", err)
}
}
// Start the poller
stream := make(chan *resource.WrittenEvent)
go b.poller(ctx, stream)
go b.poller(ctx, since, stream)
return stream, nil
}
func (b *backend) poller(ctx context.Context, stream chan<- *resource.WrittenEvent) {
func (b *backend) poller(ctx context.Context, since int64, stream chan<- *resource.WrittenEvent) {
var err error
// FIXME: we need a way to state startup of server from a (Group, Resource)
// standpoint, and consider that new (Group, Resource) may be added to
// `kind_version`, so we should probably also poll for changes in there
since := int64(0)
interval := 100 * time.Millisecond // TODO make this configurable
t := time.NewTicker(interval)
defer close(stream)
defer t.Stop()
@ -433,7 +442,6 @@ func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource
if err != nil {
return 0, fmt.Errorf("execute SQL template to poll for resource history: %w", err)
}
rows, err := b.sqlDB.QueryContext(ctx, query, pollReq.GetArgs()...)
if err != nil {
return 0, fmt.Errorf("poll for resource history: %w", err)

View File

@ -143,3 +143,48 @@ func TestBackendHappyPath(t *testing.T) {
assert.Equal(t, resource.WatchEvent_DELETED, event.Type)
})
}
func TestBackendWatchWriteEventsFromHead(t *testing.T) {
ctx := context.Background()
dbstore := db.InitTestDB(t)
rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil)
assert.NoError(t, err)
store, err := NewBackendStore(backendOptions{
DB: rdb,
})
assert.NoError(t, err)
assert.NotNil(t, store)
// Create a few resources before initing the watch
_, err = store.WriteEvent(ctx, resource.WriteEvent{
Type: resource.WatchEvent_ADDED,
Value: []byte("initial value 0"),
Key: &resource.ResourceKey{
Namespace: "namespace",
Group: "group",
Resource: "resource",
Name: "item 0",
},
})
assert.NoError(t, err)
// Start the watch
stream, err := store.WatchWriteEvents(ctx)
assert.NoError(t, err)
// Create one more event
_, err = store.WriteEvent(ctx, resource.WriteEvent{
Type: resource.WatchEvent_ADDED,
Value: []byte("initial value 2"),
Key: &resource.ResourceKey{
Namespace: "namespace",
Group: "group",
Resource: "resource",
Name: "item2",
},
})
assert.NoError(t, err)
assert.Equal(t, "item2", (<-stream).Key.Name)
}