From 8c1f60aa8b7de8acfdf228b655064e711f7fa80a Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Tue, 2 Jul 2024 14:08:34 -0700 Subject: [PATCH] remove eventID --- pkg/registry/apis/dashboard/legacy/storage.go | 2 +- pkg/storage/unified/resource/cdk_appender.go | 25 +++-------------- pkg/storage/unified/resource/event.go | 2 -- pkg/storage/unified/resource/server.go | 27 ------------------- pkg/storage/unified/sqlnext/sql_resources.go | 1 - 5 files changed, 5 insertions(+), 52 deletions(-) diff --git a/pkg/registry/apis/dashboard/legacy/storage.go b/pkg/registry/apis/dashboard/legacy/storage.go index 693889d38dd..fee75814046 100644 --- a/pkg/registry/apis/dashboard/legacy/storage.go +++ b/pkg/registry/apis/dashboard/legacy/storage.go @@ -54,7 +54,7 @@ func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.Writ case resource.WatchEvent_DELETED: { _, _, err = a.DeleteDashboard(ctx, info.OrgID, event.Key.Name) - rv = event.EventID + //rv = ??? } // The difference depends on embedded internal ID case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED: diff --git a/pkg/storage/unified/resource/cdk_appender.go b/pkg/storage/unified/resource/cdk_appender.go index e97ade7dcd4..214feea7502 100644 --- a/pkg/storage/unified/resource/cdk_appender.go +++ b/pkg/storage/unified/resource/cdk_appender.go @@ -5,14 +5,12 @@ import ( context "context" "fmt" "io" - "math/rand" "sort" "strconv" "strings" "sync" "time" - "github.com/bwmarrin/snowflake" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "gocloud.dev/blob" @@ -28,12 +26,7 @@ type CDKAppenderOptions struct { Bucket *blob.Bucket RootFolder string - // When running in a cluster, each node should have a different ID - // This is used for snowflake generation and log identification - NodeID int64 - - // Get the next ResourceVersion. When not set, this will default to snowflake IDs - NextResourceVersion func() int64 + NextResourceVersion NextResourceVersion } func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (AppendingStore, error) { @@ -56,19 +49,9 @@ func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (Appendi return nil, fmt.Errorf("the root folder does not exist") } - // This is not totally safe when running in HA + // This is not safe when running in HA! if opts.NextResourceVersion == nil { - if opts.NodeID == 0 { - opts.NodeID = rand.Int63n(1024) - } - eventNode, err := snowflake.NewNode(opts.NodeID) - if err != nil { - return nil, apierrors.NewInternalError( - fmt.Errorf("error initializing snowflake id generator :: %w", err)) - } - opts.NextResourceVersion = func() int64 { - return eventNode.Generate().Int64() - } + opts.NextResourceVersion = newResourceVersionCounter(time.Now().UnixMilli()) } return &cdkAppender{ @@ -83,7 +66,7 @@ type cdkAppender struct { tracer trace.Tracer bucket *blob.Bucket root string - nextRV func() int64 + nextRV NextResourceVersion mutex sync.Mutex // Typically one... the server wrapper diff --git a/pkg/storage/unified/resource/event.go b/pkg/storage/unified/resource/event.go index 558cfd35862..fbc2f76c60f 100644 --- a/pkg/storage/unified/resource/event.go +++ b/pkg/storage/unified/resource/event.go @@ -10,7 +10,6 @@ import ( ) type WriteEvent struct { - EventID int64 Type WatchEvent_Type // ADDED, MODIFIED, DELETED Key *ResourceKey // the request key PreviousRV int64 // only for Update+Delete @@ -84,7 +83,6 @@ func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) { } func (b *writeEventBuilder) toEvent() (event WriteEvent, err error) { - event.EventID = b.EventID event.Key = b.Key event.Type = b.Type event.ObjectOld = b.OldMeta diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 9a1e40b543c..ef9cc1ef892 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -6,11 +6,9 @@ import ( "errors" "fmt" "log/slog" - "math/rand" "sync" "time" - "github.com/bwmarrin/snowflake" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" @@ -82,13 +80,6 @@ type ResourceServerOptions struct { // OTel tracer Tracer trace.Tracer - // When running in a cluster, each node should have a different ID - // This is used for snowflake generation and log identification - NodeID int64 - - // Get the next EventID. When not set, this will default to snowflake IDs - NextEventID func() int64 - // Real storage backend Store AppendingStore @@ -117,20 +108,6 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") } - if opts.NextEventID == nil { - if opts.NodeID == 0 { - opts.NodeID = rand.Int63n(1024) - } - eventNode, err := snowflake.NewNode(opts.NodeID) - if err != nil { - return nil, apierrors.NewInternalError( - fmt.Errorf("error initializing snowflake id generator :: %w", err)) - } - opts.NextEventID = func() int64 { - return eventNode.Generate().Int64() - } - } - if opts.Store == nil { return nil, fmt.Errorf("missing AppendingStore implementation") } @@ -160,7 +137,6 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { return &server{ tracer: opts.Tracer, log: slog.Default().With("logger", "resource-server"), - nextEventID: opts.NextEventID, store: opts.Store, search: opts.Search, diagnostics: opts.Diagnostics, @@ -177,7 +153,6 @@ var _ ResourceServer = &server{} type server struct { tracer trace.Tracer log *slog.Logger - nextEventID func() int64 store AppendingStore search ResourceIndexServer blob BlobStore @@ -239,7 +214,6 @@ func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, o if err != nil { return nil, err } - event.EventID = s.nextEventID() event.Key = key event.Requester, err = identity.GetRequester(ctx) if err != nil { @@ -460,7 +434,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons now := metav1.NewTime(time.UnixMilli(s.now())) event := WriteEvent{ - EventID: s.nextEventID(), Key: req.Key, Type: WatchEvent_DELETED, PreviousRV: latest.ResourceVersion, diff --git a/pkg/storage/unified/sqlnext/sql_resources.go b/pkg/storage/unified/sqlnext/sql_resources.go index e2320bb67e6..1a35fb4bfc0 100644 --- a/pkg/storage/unified/sqlnext/sql_resources.go +++ b/pkg/storage/unified/sqlnext/sql_resources.go @@ -44,7 +44,6 @@ func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (r return resource.NewResourceServer(resource.ResourceServerOptions{ Tracer: tracer, Store: store, - NodeID: 234, // from config? used for snowflake ID Diagnostics: store, Lifecycle: store, })