Unistore : Ensure Watch works in HA mode. (#93428)

* Replace Watch with WatchNext

* remove watchset

* fix previous page and closing the channel

* Remove the broadcaster cache to prevent dupplicated events

* add watch bookmark

* add watch bookmark

* cleanup comments

* disable the tests for bookmarks for now

* Ensure we send previosu events

* lint

* re-introduce the cache

* load from cache

* disabling legacy test

* disabling legacy test

* Update pkg/storage/unified/resource/server.go

Co-authored-by: Diego Augusto Molina <diegoaugustomolina@gmail.com>

* Could not read previous events

* add proper migration

* Add previous_resource_version to both history and resource

* First event should have an RV of 2 and not 1

* Test both storage backends

* fix the inital RV for the sql backend

* ensure graceful stop of the stream decoder

* gocyclo

---------

Co-authored-by: Diego Augusto Molina <diegoaugustomolina@gmail.com>
This commit is contained in:
Georges Chaudy 2024-09-30 13:14:07 +02:00 committed by GitHub
parent e1146120f4
commit 0a26c9e9ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 475 additions and 699 deletions

View File

@ -1407,22 +1407,25 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
{
name: "legacy, RV=0",
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
{
name: "legacy, RV=unset",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
// Not Supported by unistore because there is no way to differentiate between:
// - SendInitialEvents=nil && resourceVersion=0
// - sendInitialEvents=false && resourceVersion=0
// This is a Legacy feature in k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go#196
// {
// name: "legacy, RV=0",
// resourceVersion: "0",
// initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
// expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
// podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
// expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
// },
// {
// name: "legacy, RV=unset",
// initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
// expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
// podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
// expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
// },
}
for idx, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {

View File

@ -26,7 +26,6 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
@ -51,7 +50,6 @@ type Storage struct {
store resource.ResourceClient
getKey func(string) (*resource.ResourceKey, error)
watchSet *WatchSet
versioner storage.Versioner
}
@ -84,8 +82,7 @@ func NewStorage(
trigger: trigger,
indexers: indexers,
watchSet: NewWatchSet(),
getKey: keyParser,
getKey: keyParser,
versioner: &storage.APIObjectVersioner{},
}
@ -112,9 +109,7 @@ func NewStorage(
}
}
return s, func() {
s.watchSet.cleanupWatchers()
}, nil
return s, func() {}, nil
}
func (s *Storage) Versioner() storage.Versioner {
@ -165,11 +160,6 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
})
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Added,
}, nil)
return nil
}
@ -226,16 +216,11 @@ func (s *Storage) Delete(
if err := s.versioner.UpdateObject(out, uint64(rsp.ResourceVersion)); err != nil {
return err
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Deleted,
}, nil)
return nil
}
// This version is not yet passing the watch tests
func (s *Storage) WatchNEXT(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
k, err := s.getKey(key)
if err != nil {
return watch.NewEmptyWatch(), nil
@ -255,10 +240,11 @@ func (s *Storage) WatchNEXT(ctx context.Context, key string, opts storage.ListOp
if opts.SendInitialEvents != nil {
cmd.SendInitialEvents = *opts.SendInitialEvents
}
ctx, cancelWatch := context.WithCancel(ctx)
client, err := s.store.Watch(ctx, cmd)
if err != nil {
// if the context was canceled, just return a new empty watch
cancelWatch()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) {
return watch.NewEmptyWatch(), nil
}
@ -266,138 +252,11 @@ func (s *Storage) WatchNEXT(ctx context.Context, key string, opts storage.ListOp
}
reporter := apierrors.NewClientErrorReporter(500, "WATCH", "")
decoder := &streamDecoder{
client: client,
newFunc: s.newFunc,
predicate: predicate,
codec: s.codec,
}
decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch)
return watch.NewStreamWatcher(decoder, reporter), nil
}
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by the predicate are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
k, err := s.getKey(key)
if err != nil {
return watch.NewEmptyWatch(), nil
}
req, predicate, err := toListRequest(k, opts)
if err != nil {
return watch.NewEmptyWatch(), nil
}
listObj := s.newListFunc()
var namespace *string
if k.Namespace != "" {
namespace = &k.Namespace
}
if ctx.Err() != nil {
return watch.NewEmptyWatch(), nil
}
if (opts.SendInitialEvents == nil && req.ResourceVersion == 0) || (opts.SendInitialEvents != nil && *opts.SendInitialEvents) {
if err := s.GetList(ctx, key, opts, listObj); err != nil {
return nil, err
}
listAccessor, err := meta.ListAccessor(listObj)
if err != nil {
klog.Errorf("could not determine new list accessor in watch")
return nil, err
}
// Updated if requesting RV was either "0" or ""
maybeUpdatedRV, err := s.versioner.ParseResourceVersion(listAccessor.GetResourceVersion())
if err != nil {
klog.Errorf("could not determine new list RV in watch")
return nil, err
}
jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, predicate, s.versioner, namespace)
initEvents := make([]watch.Event, 0)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return nil, err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
return nil, fmt.Errorf("need pointer to slice: %v", err)
}
for i := 0; i < v.Len(); i++ {
obj, ok := v.Index(i).Addr().Interface().(runtime.Object)
if !ok {
return nil, fmt.Errorf("need item to be a runtime.Object: %v", err)
}
initEvents = append(initEvents, watch.Event{
Type: watch.Added,
Object: obj.DeepCopyObject(),
})
}
if predicate.AllowWatchBookmarks && len(initEvents) > 0 {
listRV, err := s.versioner.ParseResourceVersion(listAccessor.GetResourceVersion())
if err != nil {
return nil, fmt.Errorf("could not get last init event's revision for bookmark: %v", err)
}
bookmarkEvent := watch.Event{
Type: watch.Bookmark,
Object: s.newFunc(),
}
if err := s.versioner.UpdateObject(bookmarkEvent.Object, listRV); err != nil {
return nil, err
}
bookmarkObject, err := meta.Accessor(bookmarkEvent.Object)
if err != nil {
return nil, fmt.Errorf("could not get bookmark object's acccesor: %v", err)
}
bookmarkObject.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
initEvents = append(initEvents, bookmarkEvent)
}
jw.Start(initEvents...)
return jw, nil
}
maybeUpdatedRV := uint64(req.ResourceVersion)
if maybeUpdatedRV == 0 {
rsp, err := s.store.List(ctx, &resource.ListRequest{
Options: &resource.ListOptions{
Key: k,
},
Limit: 1, // we ignore the results, just look at the RV
})
if err != nil {
return nil, err
}
if rsp.Error != nil {
return nil, resource.GetError(rsp.Error)
}
maybeUpdatedRV = uint64(rsp.ResourceVersion)
if maybeUpdatedRV < 1 {
return nil, fmt.Errorf("expecting a non-zero resource version")
}
}
jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, predicate, s.versioner, namespace)
jw.Start()
return jw, nil
}
// Get unmarshals object found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
@ -668,17 +527,6 @@ func (s *Storage) GuaranteedUpdate(
return err
}
if created {
s.watchSet.notifyWatchers(watch.Event{
Object: destination.DeepCopyObject(),
Type: watch.Added,
}, nil)
} else {
s.watchSet.notifyWatchers(watch.Event{
Object: destination.DeepCopyObject(),
Type: watch.Modified,
}, existingObj.DeepCopyObject())
}
return nil
}

View File

@ -92,12 +92,13 @@ func TestCreate(t *testing.T) {
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(store))
}
func TestCreateWithTTL(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestCreateWithTTL(ctx, t, store)
}
// No TTL support in unifed storage
// func TestCreateWithTTL(t *testing.T) {
// ctx, store, destroyFunc, err := testSetup(t)
// defer destroyFunc()
// assert.NoError(t, err)
// storagetesting.RunTestCreateWithTTL(ctx, t, store)
// }
func TestCreateWithKeyExist(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)

View File

@ -1,9 +1,11 @@
package apistore
import (
"context"
"errors"
"fmt"
"io"
"sync"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
@ -17,12 +19,23 @@ import (
)
type streamDecoder struct {
client resource.ResourceStore_WatchClient
newFunc func() runtime.Object
predicate storage.SelectionPredicate
codec runtime.Codec
client resource.ResourceStore_WatchClient
newFunc func() runtime.Object
predicate storage.SelectionPredicate
codec runtime.Codec
cancelWatch context.CancelFunc
done sync.WaitGroup
}
func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc) *streamDecoder {
return &streamDecoder{
client: client,
newFunc: newFunc,
predicate: predicate,
codec: codec,
cancelWatch: cancelWatch,
}
}
func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) {
obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc())
if err == nil {
@ -35,25 +48,30 @@ func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Objec
return obj, err
}
// nolint: gocyclo // we may be able to simplify this in the future, but this is a complex function by nature
func (d *streamDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
d.done.Add(1)
defer d.done.Done()
decode:
for {
err := d.client.Context().Err()
if err != nil {
klog.Errorf("client: context error: %s\n", err)
return watch.Error, nil, err
var evt *resource.WatchEvent
var err error
select {
case <-d.client.Context().Done():
default:
evt, err = d.client.Recv()
}
evt, err := d.client.Recv()
if errors.Is(err, io.EOF) {
switch {
case errors.Is(d.client.Context().Err(), context.Canceled):
return watch.Error, nil, io.EOF
case d.client.Context().Err() != nil:
return watch.Error, nil, d.client.Context().Err()
case errors.Is(err, io.EOF):
return watch.Error, nil, io.EOF
case grpcStatus.Code(err) == grpcCodes.Canceled:
return watch.Error, nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
return watch.Error, nil, err
}
if err != nil {
case err != nil:
klog.Errorf("client: error receiving result: %s", err)
return watch.Error, nil, err
}
@ -194,10 +212,15 @@ decode:
}
func (d *streamDecoder) Close() {
// Close the send stream
err := d.client.CloseSend()
if err != nil {
klog.Errorf("error closing watch stream: %s", err)
}
// Cancel the send context
d.cancelWatch()
// Wait for all decode operations to finish
d.done.Wait()
}
var _ watch.Decoder = (*streamDecoder)(nil)

View File

@ -7,9 +7,9 @@ package apistore
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -29,7 +29,20 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagetesting "github.com/grafana/grafana/pkg/apiserver/storage/testing"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/tests/testsuite"
)
type StorageType string
const (
StorageTypeFile StorageType = "file"
StorageTypeUnified StorageType = "unified"
)
var scheme = runtime.NewScheme()
@ -48,6 +61,7 @@ type setupOptions struct {
prefix string
resourcePrefix string
groupResource schema.GroupResource
storageType StorageType
}
type setupOption func(*setupOptions, testing.TB)
@ -59,10 +73,20 @@ func withDefaults(options *setupOptions, t testing.TB) {
options.prefix = t.TempDir()
options.resourcePrefix = storagetesting.KeyFunc("", "")
options.groupResource = schema.GroupResource{Resource: "pods"}
options.storageType = StorageTypeFile
}
func withStorageType(storageType StorageType) setupOption {
return func(options *setupOptions, t testing.TB) {
options.storageType = storageType
}
}
var _ setupOption = withDefaults
func TestMain(m *testing.M) {
testsuite.Run(m)
}
func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Interface, factory.DestroyFunc, error) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
@ -85,18 +109,56 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
Metadata: fileblob.MetadataDontWrite, // skip
})
require.NoError(t, err)
fmt.Printf("ROOT: %s\n\n", tmp)
}
ctx := storagetesting.NewContext()
backend, err := resource.NewCDKBackend(ctx, resource.CDKBackendOptions{
Bucket: bucket,
})
require.NoError(t, err)
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
})
require.NoError(t, err)
var server resource.ResourceServer
switch setupOpts.storageType {
case StorageTypeFile:
backend, err := resource.NewCDKBackend(ctx, resource.CDKBackendOptions{
Bucket: bucket,
})
require.NoError(t, err)
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
})
require.NoError(t, err)
// Issue a health check to ensure the server is initialized
_, err = server.IsHealthy(ctx, &resource.HealthCheckRequest{})
require.NoError(t, err)
case StorageTypeUnified:
if testing.Short() {
t.Skip("skipping integration test")
}
dbstore := infraDB.InitTestDB(t)
cfg := setting.NewCfg()
features := featuremgmt.WithFeatures()
eDB, err := dbimpl.ProvideResourceDB(dbstore, cfg, features, nil)
require.NoError(t, err)
require.NotNil(t, eDB)
ret, err := sql.NewBackend(sql.BackendOptions{
DBProvider: eDB,
PollingInterval: time.Millisecond, // Keep this fast
})
require.NoError(t, err)
require.NotNil(t, ret)
ctx := storagetesting.NewContext()
err = ret.Init(ctx)
require.NoError(t, err)
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: ret,
Diagnostics: ret,
Lifecycle: ret,
})
require.NoError(t, err)
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
@ -124,55 +186,82 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
}
func TestWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatch(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t, withStorageType(s))
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatch(ctx, t, store)
})
}
}
func TestClusterScopedWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
})
}
}
func TestNamespaceScopedWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
})
}
}
func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
})
}
}
func TestWatchFromZero(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchFromZero(ctx, t, store, nil)
}
// Not Supported by unistore because there is no way to differentiate between:
// - SendInitialEvents=nil && resourceVersion=0
// - sendInitialEvents=false && resourceVersion=0
// This is a Legacy feature in k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go#196
// func TestWatchFromZero(t *testing.T) {
// ctx, store, destroyFunc, err := testSetup(t)
// defer destroyFunc()
// assert.NoError(t, err)
// storagetesting.RunTestWatchFromZero(ctx, t, store, nil)
// }
// TestWatchFromNonZero tests that
// - watch from non-0 should just watch changes after given version
func TestWatchFromNonZero(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
})
}
}
/*
Only valid when using a cached storage
func TestDelayedWatchDelivery(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
}
/*
/*
func TestWatchError(t *testing.T) {
@ -182,24 +271,36 @@ func TestWatchError(t *testing.T) {
*/
func TestWatchContextCancel(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchContextCancel(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchContextCancel(ctx, t, store)
})
}
}
func TestWatcherTimeout(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
})
}
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
})
}
}
// TODO: enable when we support flow control and priority fairness
@ -221,31 +322,47 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
// setting allowWatchBookmarks query param against
// etcd implementation doesn't have any effect.
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
})
}
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
})
}
}
func TestEtcdWatchSemantics(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemantics(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemantics(ctx, t, store)
})
}
}
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
for _, s := range []StorageType{StorageTypeFile, StorageTypeUnified} {
t.Run(string(s), func(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
})
}
}
func newPod() runtime.Object {

View File

@ -1,376 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/tilt-dev/tilt-apiserver/blob/main/pkg/storage/filepath/watchset.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package apistore
import (
"context"
"fmt"
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
)
const (
UpdateChannelSize = 25
InitialWatchNodesSize = 20
InitialBufferedEventsSize = 25
)
type eventWrapper struct {
ev watch.Event
// optional: oldObject is only set for modifications for determining their type as necessary (when using predicate filtering)
oldObject runtime.Object
}
type watchNode struct {
ctx context.Context
s *WatchSet
id uint64
updateCh chan eventWrapper
outCh chan watch.Event
requestedRV uint64
// the watch may or may not be namespaced for a namespaced resource. This is always nil for cluster-scoped kinds
watchNamespace *string
predicate storage.SelectionPredicate
versioner storage.Versioner
}
// Keeps track of which watches need to be notified
type WatchSet struct {
mu sync.RWMutex
// mu protects both nodes and counter
nodes map[uint64]*watchNode
counter atomic.Uint64
buffered []eventWrapper
bufferedMutex sync.RWMutex
}
func NewWatchSet() *WatchSet {
return &WatchSet{
buffered: make([]eventWrapper, 0, InitialBufferedEventsSize),
nodes: make(map[uint64]*watchNode, InitialWatchNodesSize),
}
}
// Creates a new watch with a unique id, but
// does not start sending events to it until start() is called.
func (s *WatchSet) newWatch(ctx context.Context, requestedRV uint64, p storage.SelectionPredicate, versioner storage.Versioner, namespace *string) *watchNode {
s.counter.Add(1)
node := &watchNode{
ctx: ctx,
requestedRV: requestedRV,
id: s.counter.Load(),
s: s,
// updateCh size needs to be > 1 to allow slower clients to not block passing new events
updateCh: make(chan eventWrapper, UpdateChannelSize),
// outCh size needs to be > 1 for single process use-cases such as tests where watch and event seeding from CUD
// events is happening on the same thread
outCh: make(chan watch.Event, UpdateChannelSize),
predicate: p,
watchNamespace: namespace,
versioner: versioner,
}
return node
}
func (s *WatchSet) cleanupWatchers() {
s.mu.Lock()
defer s.mu.Unlock()
for _, w := range s.nodes {
w.stop()
}
}
// oldObject is only passed in the event of a modification
// in case a predicate filtered watch is impacted as a result of modification
// NOTE: this function gives one the misperception that a newly added node will never
// get a double event, one from buffered and one from the update channel
// That perception is not true. Even though this function maintains the lock throughout the function body
// it is not true of the Start function. So basically, the Start function running after this function
// fully stands the chance of another future notifyWatchers double sending it the event through the two means mentioned
func (s *WatchSet) notifyWatchers(ev watch.Event, oldObject runtime.Object) {
s.mu.RLock()
defer s.mu.RUnlock()
updateEv := eventWrapper{
ev: ev,
}
if oldObject != nil {
updateEv.oldObject = oldObject
}
// Events are always buffered.
// this is because of an inadvertent delay which is built into the watch process
// Watch() from storage returns Watch.Interface with a async start func.
// The only way to guarantee that we can interpret the passed RV correctly is to play it against missed events
// (notice the loop below over s.nodes isn't exactly going to work on a new node
// unless start is called on it)
s.bufferedMutex.Lock()
s.buffered = append(s.buffered, updateEv)
s.bufferedMutex.Unlock()
for _, w := range s.nodes {
w.updateCh <- updateEv
}
}
// isValid is not necessary to be called on oldObject in UpdateEvents - assuming the Watch pushes correctly setup eventWrapper our way
// first bool is whether the event is valid for current watcher
// second bool is whether checking the old value against the predicate may be valuable to the caller
// second bool may be a helpful aid to establish context around MODIFIED events
// (note that this second bool is only marked true if we pass other checks first, namely RV and namespace)
func (w *watchNode) isValid(e eventWrapper) (bool, bool, error) {
obj, err := meta.Accessor(e.ev.Object)
if err != nil {
klog.Error("Could not get accessor to object in event")
return false, false, nil
}
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
if err != nil {
return false, false, err
}
if eventRV < w.requestedRV {
return false, false, nil
}
if w.watchNamespace != nil && *w.watchNamespace != obj.GetNamespace() {
return false, false, err
}
valid, err := w.predicate.Matches(e.ev.Object)
if err != nil {
return false, false, err
}
return valid, e.ev.Type == watch.Modified, nil
}
// Only call this method if current object matches the predicate
func (w *watchNode) handleAddedForFilteredList(e eventWrapper) (*watch.Event, error) {
if e.oldObject == nil {
return nil, fmt.Errorf("oldObject should be set for modified events")
}
ok, err := w.predicate.Matches(e.oldObject)
if err != nil {
return nil, err
}
if !ok {
e.ev.Type = watch.Added
return &e.ev, nil
}
return nil, nil
}
func (w *watchNode) handleDeletedForFilteredList(e eventWrapper) (*watch.Event, error) {
if e.oldObject == nil {
return nil, fmt.Errorf("oldObject should be set for modified events")
}
ok, err := w.predicate.Matches(e.oldObject)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
// isn't a match but used to be
e.ev.Type = watch.Deleted
oldObjectAccessor, err := meta.Accessor(e.oldObject)
if err != nil {
klog.Errorf("Could not get accessor to correct the old RV of filtered out object")
return nil, err
}
currentRV, err := getResourceVersion(e.ev.Object)
if err != nil {
klog.Errorf("Could not get accessor to object in event")
return nil, err
}
oldObjectAccessor.SetResourceVersion(currentRV)
e.ev.Object = e.oldObject
return &e.ev, nil
}
func (w *watchNode) processEvent(e eventWrapper, isInitEvent bool) error {
if isInitEvent {
// Init events have already been vetted against the predicate and other RV behavior
// Let them pass through
w.outCh <- e.ev
return nil
}
valid, runDeleteFromFilteredListHandler, err := w.isValid(e)
if err != nil {
klog.Errorf("Could not determine validity of the event: %v", err)
return err
}
if valid {
if e.ev.Type == watch.Modified {
ev, err := w.handleAddedForFilteredList(e)
if err != nil {
return err
}
if ev != nil {
w.outCh <- *ev
} else {
// forward the original event if add handling didn't signal any impact
w.outCh <- e.ev
}
} else {
w.outCh <- e.ev
}
return nil
}
if runDeleteFromFilteredListHandler {
if e.ev.Type == watch.Modified {
ev, err := w.handleDeletedForFilteredList(e)
if err != nil {
return err
}
if ev != nil {
w.outCh <- *ev
}
} // explicitly doesn't have an event forward for the else case here
return nil
}
return nil
}
// Start sending events to this watch.
func (w *watchNode) Start(initEvents ...watch.Event) {
w.s.mu.Lock()
w.s.nodes[w.id] = w
w.s.mu.Unlock()
go func() {
maxRV := uint64(0)
for _, ev := range initEvents {
currentRV, err := w.getResourceVersionAsInt(ev.Object)
if err != nil {
klog.Errorf("Could not determine init event RV for deduplication of buffered events: %v", err)
continue
}
if maxRV < currentRV {
maxRV = currentRV
}
if err := w.processEvent(eventWrapper{ev: ev}, true); err != nil {
klog.Errorf("Could not process event: %v", err)
}
}
// If we had no init events, simply rely on the passed RV
if maxRV == 0 {
maxRV = w.requestedRV
}
w.s.bufferedMutex.RLock()
for _, e := range w.s.buffered {
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
if err != nil {
klog.Errorf("Could not determine RV for deduplication of buffered events: %v", err)
continue
}
if maxRV >= eventRV {
continue
} else {
maxRV = eventRV
}
if err := w.processEvent(e, false); err != nil {
klog.Errorf("Could not process event: %v", err)
}
}
w.s.bufferedMutex.RUnlock()
for {
select {
case e, ok := <-w.updateCh:
if !ok {
close(w.outCh)
return
}
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
if err != nil {
klog.Errorf("Could not determine RV for deduplication of channel events: %v", err)
continue
}
if maxRV >= eventRV {
continue
} else {
maxRV = eventRV
}
if err := w.processEvent(e, false); err != nil {
klog.Errorf("Could not process event: %v", err)
}
case <-w.ctx.Done():
close(w.outCh)
return
}
}
}()
}
func (w *watchNode) Stop() {
w.s.mu.Lock()
defer w.s.mu.Unlock()
w.stop()
}
// Unprotected func: ensure mutex on the parent watch set is locked before calling
func (w *watchNode) stop() {
if _, ok := w.s.nodes[w.id]; ok {
delete(w.s.nodes, w.id)
close(w.updateCh)
}
}
func (w *watchNode) ResultChan() <-chan watch.Event {
return w.outCh
}
func getResourceVersion(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
klog.Error("Could not get accessor to object in event")
return "", err
}
return accessor.GetResourceVersion(), nil
}
func (w *watchNode) getResourceVersionAsInt(obj runtime.Object) (uint64, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
klog.Error("Could not get accessor to object in event")
return 0, err
}
return w.versioner.ParseResourceVersion(accessor.GetResourceVersion())
}

View File

@ -7,6 +7,7 @@ import (
"log/slog"
"net/http"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/trace"
@ -144,14 +145,15 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
log *slog.Logger
backend StorageBackend
index ResourceIndexServer
diagnostics DiagnosticsServer
access WriteAccessHooks
lifecycle LifecycleHooks
now func() int64
tracer trace.Tracer
log *slog.Logger
backend StorageBackend
index ResourceIndexServer
diagnostics DiagnosticsServer
access WriteAccessHooks
lifecycle LifecycleHooks
now func() int64
mostRecentRV atomic.Int64 // The most recent resource version seen by the server
// Background watch task -- this has permissions for everything
ctx context.Context
@ -326,12 +328,12 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
rsp.Error = e
return rsp, nil
}
var err error
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
if err != nil {
rsp.Error = AsErrorResult(err)
}
s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource)
return rsp, nil
}
@ -537,6 +539,8 @@ func (s *server) initWatcher() error {
for {
// pipe all events
v := <-events
s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name)
s.mostRecentRV.Store(v.ResourceVersion)
out <- v
}
}()
@ -552,23 +556,67 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
return err
}
// Start listening -- this will buffer any changes that happen while we backfill
// Start listening -- this will buffer any changes that happen while we backfill.
// If events are generated faster than we can process them, then some events will be dropped.
// TODO: Think of a way to allow the client to catch up.
stream, err := s.broadcaster.Subscribe(ctx)
if err != nil {
return err
}
defer s.broadcaster.Unsubscribe(stream)
since := req.Since
if req.SendInitialEvents {
fmt.Printf("TODO... query\n")
// All initial events are CREATE
if !req.SendInitialEvents && req.Since == 0 {
// This is a temporary hack only relevant for tests to ensure that the first events are sent.
// This is required because the SQL backend polls the database every 100ms.
// TODO: Implement a getLatestResourceVersion method in the backend.
time.Sleep(10 * time.Millisecond)
}
if req.AllowWatchBookmarks {
fmt.Printf("TODO... send bookmark\n")
mostRecentRV := s.mostRecentRV.Load() // get the latest resource version
var initialEventsRV int64 // resource version coming from the initial events
if req.SendInitialEvents {
// Backfill the stream by adding every existing entities.
initialEventsRV, err = s.backend.ListIterator(ctx, &ListRequest{Options: req.Options}, func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
return err
}
if err := srv.Send(&WatchEvent{
Type: WatchEvent_ADDED,
Resource: &WatchEvent_Resource{
Value: iter.Value(),
Version: iter.ResourceVersion(),
},
}); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
if req.SendInitialEvents && req.AllowWatchBookmarks {
if err := srv.Send(&WatchEvent{
Type: WatchEvent_BOOKMARK,
Resource: &WatchEvent_Resource{
Version: initialEventsRV,
},
}); err != nil {
return err
}
}
var since int64 // resource version to start watching from
switch {
case req.SendInitialEvents:
since = initialEventsRV
case req.Since == 0:
since = mostRecentRV
default:
since = req.Since
}
for {
select {
case <-ctx.Done():
@ -579,23 +627,39 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
s.log.Debug("watch events closed")
return nil
}
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
// Currently sending *every* event
// if req.Options.Labels != nil {
// // match *either* the old or new object
// }
// TODO: return values that match either the old or the new
if err := srv.Send(&WatchEvent{
value := event.Value
// remove the delete marker stored in the value for deleted objects
if event.Type == WatchEvent_DELETED {
value = []byte{}
}
resp := &WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &WatchEvent_Resource{
Value: event.Value,
Value: value,
Version: event.ResourceVersion,
},
// TODO... previous???
}); err != nil {
}
if event.PreviousRV > 0 {
prevObj, err := s.Read(ctx, &ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
if err != nil {
// This scenario should never happen, but if it does, we should log it and continue
// sending the event without the previous object. The client will decide what to do.
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
} else {
if prevObj.ResourceVersion != event.PreviousRV {
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
return fmt.Errorf("resource version mismatch")
}
resp.Previous = &WatchEvent_Resource{
Value: prevObj.Value,
Version: prevObj.ResourceVersion,
}
}
}
if err := srv.Send(resp); err != nil {
return err
}
}

View File

@ -22,6 +22,7 @@ import (
)
const trace_prefix = "sql.resource."
const defaultPollingInterval = 100 * time.Millisecond
type Backend interface {
resource.StorageBackend
@ -30,8 +31,9 @@ type Backend interface {
}
type BackendOptions struct {
DBProvider db.DBProvider
Tracer trace.Tracer
DBProvider db.DBProvider
Tracer trace.Tracer
PollingInterval time.Duration
}
func NewBackend(opts BackendOptions) (Backend, error) {
@ -43,12 +45,17 @@ func NewBackend(opts BackendOptions) (Backend, error) {
}
ctx, cancel := context.WithCancel(context.Background())
pollingInterval := opts.PollingInterval
if pollingInterval == 0 {
pollingInterval = defaultPollingInterval
}
return &backend{
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
pollingInterval: pollingInterval,
}, nil
}
@ -70,6 +77,7 @@ type backend struct {
// watch streaming
//stream chan *resource.WatchEvent
pollingInterval time.Duration
}
func (b *backend) Init(ctx context.Context) error {
@ -180,7 +188,6 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
return nil
})
return newVersion, err
}
@ -512,8 +519,7 @@ func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.Writte
}
func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) {
interval := 100 * time.Millisecond // TODO make this configurable
t := time.NewTicker(interval)
t := time.NewTicker(b.pollingInterval)
defer close(stream)
defer t.Stop()
@ -526,7 +532,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
grv, err := b.listLatestRVs(ctx)
if err != nil {
b.log.Error("get the latest resource version", "err", err)
t.Reset(interval)
t.Reset(b.pollingInterval)
continue
}
for group, items := range grv {
@ -543,7 +549,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
next, err := b.poll(ctx, group, resource, since[group][resource], stream)
if err != nil {
b.log.Error("polling for resource", "err", err)
t.Reset(interval)
t.Reset(b.pollingInterval)
continue
}
if next > since[group][resource] {
@ -552,7 +558,7 @@ func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan
}
}
t.Reset(interval)
t.Reset(b.pollingInterval)
}
}
}
@ -636,7 +642,8 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
Resource: rec.Key.Resource,
Name: rec.Key.Name,
},
Type: resource.WatchEvent_Type(rec.Action),
Type: resource.WatchEvent_Type(rec.Action),
PreviousRV: rec.PreviousRV,
},
ResourceVersion: rec.ResourceVersion,
// Timestamp: , // TODO: add timestamp
@ -663,15 +670,16 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
if errors.Is(err, sql.ErrNoRows) {
// if there wasn't a row associated with the given resource, we create one with
// version 1
// version 2 to match the etcd behavior.
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
SQLTemplate: sqltemplate.New(d),
Group: key.Group,
Resource: key.Resource,
resourceVersion: &resourceVersion{1},
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
return 1, nil
return 2, nil
}
if err != nil {

View File

@ -227,7 +227,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
require.NoError(t, err)
require.Equal(t, int64(1), v)
require.Equal(t, int64(2), v)
})
t.Run("happy path - update existing row", func(t *testing.T) {
@ -304,7 +304,7 @@ func TestBackend_create(t *testing.T) {
v, err := b.create(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(1), v)
require.Equal(t, int64(2), v)
})
t.Run("error inserting into resource", func(t *testing.T) {
@ -409,7 +409,7 @@ func TestBackend_update(t *testing.T) {
v, err := b.update(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(1), v)
require.Equal(t, int64(2), v)
})
t.Run("error in first update to resource", func(t *testing.T) {
@ -513,7 +513,7 @@ func TestBackend_delete(t *testing.T) {
v, err := b.delete(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(1), v)
require.Equal(t, int64(2), v)
})
t.Run("error deleting resource", func(t *testing.T) {

View File

@ -6,6 +6,7 @@ INSERT INTO {{ .Ident "resource_history" }}
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "previous_resource_version"}},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
@ -17,6 +18,7 @@ INSERT INTO {{ .Ident "resource_history" }}
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.PreviousRV }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)

View File

@ -5,7 +5,8 @@ SELECT
{{ .Ident "resource" | .Into .Response.Key.Resource }},
{{ .Ident "name" | .Into .Response.Key.Name }},
{{ .Ident "value" | .Into .Response.Value }},
{{ .Ident "action" | .Into .Response.Action }}
{{ .Ident "action" | .Into .Response.Action }},
{{ .Ident "previous_resource_version" | .Into .Response.PreviousRV }}
FROM {{ .Ident "resource_history" }}
WHERE 1 = 1

View File

@ -7,6 +7,7 @@ INSERT INTO {{ .Ident "resource" }}
{{ .Ident "namespace" }},
{{ .Ident "name" }},
{{ .Ident "previous_resource_version" }},
{{ .Ident "value" }},
{{ .Ident "action" }}
)
@ -17,6 +18,7 @@ INSERT INTO {{ .Ident "resource" }}
{{ .Arg .WriteEvent.Key.Namespace }},
{{ .Arg .WriteEvent.Key.Name }},
{{ .Arg .WriteEvent.PreviousRV }},
{{ .Arg .WriteEvent.Value }},
{{ .Arg .WriteEvent.Type }}
)

View File

@ -8,6 +8,6 @@ INSERT INTO {{ .Ident "resource_version" }}
VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
1
2
)
;

View File

@ -10,8 +10,7 @@ func initResourceTables(mg *migrator.Migrator) string {
marker := "Initialize resource tables"
mg.AddMigration(marker, &migrator.RawSQLMigration{})
tables := []migrator.Table{}
tables = append(tables, migrator.Table{
resource_table := migrator.Table{
Name: "resource",
Columns: []*migrator.Column{
// primary identifier
@ -33,9 +32,8 @@ func initResourceTables(mg *migrator.Migrator) string {
Indices: []*migrator.Index{
{Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.UniqueIndex},
},
})
tables = append(tables, migrator.Table{
}
resource_history_table := migrator.Table{
Name: "resource_history",
Columns: []*migrator.Column{
// primary identifier
@ -62,7 +60,9 @@ func initResourceTables(mg *migrator.Migrator) string {
// index to support watch poller
{Cols: []string{"resource_version"}, Type: migrator.IndexType},
},
})
}
tables := []migrator.Table{resource_table, resource_history_table}
// tables = append(tables, migrator.Table{
// Name: "resource_label_set",
@ -97,5 +97,13 @@ func initResourceTables(mg *migrator.Migrator) string {
}
}
mg.AddMigration("Add column previous_resource_version in resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{
Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false,
}))
mg.AddMigration("Add column previous_resource_version in resource", migrator.NewAddColumnMigration(resource_table, &migrator.Column{
Name: "previous_resource_version", Type: migrator.DB_BigInt, Nullable: false,
}))
return marker
}

View File

@ -70,6 +70,7 @@ func (r sqlResourceRequest) Validate() error {
type historyPollResponse struct {
Key resource.ResourceKey
ResourceVersion int64
PreviousRV int64
Value []byte
Action int
}
@ -101,6 +102,7 @@ func (r *sqlResourceHistoryPollRequest) Results() (*historyPollResponse, error)
Name: r.Response.Key.Name,
},
ResourceVersion: r.Response.ResourceVersion,
PreviousRV: r.Response.PreviousRV,
Value: r.Response.Value,
Action: r.Response.Action,
}, nil

View File

@ -104,6 +104,18 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
},
sqlResourceHistoryPoll: {
{
Name: "single path",
Data: &sqlResourceHistoryPollRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Resource: "res",
Group: "group",
SinceResourceVersion: 1234,
Response: new(historyPollResponse),
},
},
},
sqlResourceUpdateRV: {
{
@ -143,7 +155,8 @@ func TestUnifiedStorageQueries(t *testing.T) {
Data: &sqlResourceRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{},
Key: &resource.ResourceKey{},
PreviousRV: 1234,
},
},
},

View File

@ -5,6 +5,7 @@ INSERT INTO `resource_history`
`resource`,
`namespace`,
`name`,
`previous_resource_version`,
`value`,
`action`
)
@ -14,6 +15,7 @@ INSERT INTO `resource_history`
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)

View File

@ -0,0 +1,16 @@
SELECT
`resource_version`,
`namespace`,
`group`,
`resource`,
`name`,
`value`,
`action`,
`previous_resource_version`
FROM `resource_history`
WHERE 1 = 1
AND `group` = 'group'
AND `resource` = 'res'
AND `resource_version` > 1234
ORDER BY `resource_version` ASC
;

View File

@ -5,6 +5,7 @@ INSERT INTO `resource`
`resource`,
`namespace`,
`name`,
`previous_resource_version`,
`value`,
`action`
)
@ -14,6 +15,7 @@ INSERT INTO `resource`
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)

View File

@ -7,6 +7,6 @@ INSERT INTO `resource_version`
VALUES (
'',
'',
1
2
)
;

View File

@ -5,6 +5,7 @@ INSERT INTO "resource_history"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@ -14,6 +15,7 @@ INSERT INTO "resource_history"
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)

View File

@ -0,0 +1,16 @@
SELECT
"resource_version",
"namespace",
"group",
"resource",
"name",
"value",
"action",
"previous_resource_version"
FROM "resource_history"
WHERE 1 = 1
AND "group" = 'group'
AND "resource" = 'res'
AND "resource_version" > 1234
ORDER BY "resource_version" ASC
;

View File

@ -5,6 +5,7 @@ INSERT INTO "resource"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@ -14,6 +15,7 @@ INSERT INTO "resource"
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)

View File

@ -7,6 +7,6 @@ INSERT INTO "resource_version"
VALUES (
'',
'',
1
2
)
;

View File

@ -5,6 +5,7 @@ INSERT INTO "resource_history"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@ -14,6 +15,7 @@ INSERT INTO "resource_history"
'',
'',
'',
1234,
'[]',
'UNKNOWN'
)

View File

@ -0,0 +1,16 @@
SELECT
"resource_version",
"namespace",
"group",
"resource",
"name",
"value",
"action",
"previous_resource_version"
FROM "resource_history"
WHERE 1 = 1
AND "group" = 'group'
AND "resource" = 'res'
AND "resource_version" > 1234
ORDER BY "resource_version" ASC
;

View File

@ -5,6 +5,7 @@ INSERT INTO "resource"
"resource",
"namespace",
"name",
"previous_resource_version",
"value",
"action"
)
@ -14,6 +15,7 @@ INSERT INTO "resource"
'rr',
'nn',
'name',
123,
'[]',
'ADDED'
)

View File

@ -7,6 +7,6 @@ INSERT INTO "resource_version"
VALUES (
'',
'',
1
2
)
;