remove eventID

This commit is contained in:
Ryan McKinley 2024-07-02 14:08:34 -07:00
parent abacd9aef2
commit 8c1f60aa8b
5 changed files with 5 additions and 52 deletions

View File

@ -54,7 +54,7 @@ func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.Writ
case resource.WatchEvent_DELETED:
{
_, _, err = a.DeleteDashboard(ctx, info.OrgID, event.Key.Name)
rv = event.EventID
//rv = ???
}
// The difference depends on embedded internal ID
case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED:

View File

@ -5,14 +5,12 @@ import (
context "context"
"fmt"
"io"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/bwmarrin/snowflake"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
@ -28,12 +26,7 @@ type CDKAppenderOptions struct {
Bucket *blob.Bucket
RootFolder string
// When running in a cluster, each node should have a different ID
// This is used for snowflake generation and log identification
NodeID int64
// Get the next ResourceVersion. When not set, this will default to snowflake IDs
NextResourceVersion func() int64
NextResourceVersion NextResourceVersion
}
func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (AppendingStore, error) {
@ -56,19 +49,9 @@ func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (Appendi
return nil, fmt.Errorf("the root folder does not exist")
}
// This is not totally safe when running in HA
// This is not safe when running in HA!
if opts.NextResourceVersion == nil {
if opts.NodeID == 0 {
opts.NodeID = rand.Int63n(1024)
}
eventNode, err := snowflake.NewNode(opts.NodeID)
if err != nil {
return nil, apierrors.NewInternalError(
fmt.Errorf("error initializing snowflake id generator :: %w", err))
}
opts.NextResourceVersion = func() int64 {
return eventNode.Generate().Int64()
}
opts.NextResourceVersion = newResourceVersionCounter(time.Now().UnixMilli())
}
return &cdkAppender{
@ -83,7 +66,7 @@ type cdkAppender struct {
tracer trace.Tracer
bucket *blob.Bucket
root string
nextRV func() int64
nextRV NextResourceVersion
mutex sync.Mutex
// Typically one... the server wrapper

View File

@ -10,7 +10,6 @@ import (
)
type WriteEvent struct {
EventID int64
Type WatchEvent_Type // ADDED, MODIFIED, DELETED
Key *ResourceKey // the request key
PreviousRV int64 // only for Update+Delete
@ -84,7 +83,6 @@ func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) {
}
func (b *writeEventBuilder) toEvent() (event WriteEvent, err error) {
event.EventID = b.EventID
event.Key = b.Key
event.Type = b.Type
event.ObjectOld = b.OldMeta

View File

@ -6,11 +6,9 @@ import (
"errors"
"fmt"
"log/slog"
"math/rand"
"sync"
"time"
"github.com/bwmarrin/snowflake"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
@ -82,13 +80,6 @@ type ResourceServerOptions struct {
// OTel tracer
Tracer trace.Tracer
// When running in a cluster, each node should have a different ID
// This is used for snowflake generation and log identification
NodeID int64
// Get the next EventID. When not set, this will default to snowflake IDs
NextEventID func() int64
// Real storage backend
Store AppendingStore
@ -117,20 +108,6 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
}
if opts.NextEventID == nil {
if opts.NodeID == 0 {
opts.NodeID = rand.Int63n(1024)
}
eventNode, err := snowflake.NewNode(opts.NodeID)
if err != nil {
return nil, apierrors.NewInternalError(
fmt.Errorf("error initializing snowflake id generator :: %w", err))
}
opts.NextEventID = func() int64 {
return eventNode.Generate().Int64()
}
}
if opts.Store == nil {
return nil, fmt.Errorf("missing AppendingStore implementation")
}
@ -160,7 +137,6 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
nextEventID: opts.NextEventID,
store: opts.Store,
search: opts.Search,
diagnostics: opts.Diagnostics,
@ -177,7 +153,6 @@ var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
log *slog.Logger
nextEventID func() int64
store AppendingStore
search ResourceIndexServer
blob BlobStore
@ -239,7 +214,6 @@ func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, o
if err != nil {
return nil, err
}
event.EventID = s.nextEventID()
event.Key = key
event.Requester, err = identity.GetRequester(ctx)
if err != nil {
@ -460,7 +434,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
now := metav1.NewTime(time.UnixMilli(s.now()))
event := WriteEvent{
EventID: s.nextEventID(),
Key: req.Key,
Type: WatchEvent_DELETED,
PreviousRV: latest.ResourceVersion,

View File

@ -44,7 +44,6 @@ func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (r
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Store: store,
NodeID: 234, // from config? used for snowflake ID
Diagnostics: store,
Lifecycle: store,
})