From cc2dd8b20b775ed44dbee3e3dca9b48d1e2625c6 Mon Sep 17 00:00:00 2001 From: Silvan Date: Sun, 31 Dec 2023 15:30:25 +0100 Subject: [PATCH] fix(eventstore): increase performance on push (#7125) --- cmd/setup/19.go | 26 ++++++++++++++++++++++ cmd/setup/19.sql | 1 + cmd/setup/config.go | 1 + cmd/setup/setup.go | 3 +++ internal/eventstore/v3/sequence.go | 4 ++-- internal/eventstore/v3/sequence_test.go | 6 ++--- internal/eventstore/v3/sequences_query.sql | 24 +++++--------------- 7 files changed, 42 insertions(+), 23 deletions(-) create mode 100644 cmd/setup/19.go create mode 100644 cmd/setup/19.sql diff --git a/cmd/setup/19.go b/cmd/setup/19.go new file mode 100644 index 0000000000..7919ef9ad9 --- /dev/null +++ b/cmd/setup/19.go @@ -0,0 +1,26 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/database" +) + +var ( + //go:embed 19.sql + addCurrentSequencesIndex string +) + +type AddCurrentSequencesIndex struct { + dbClient *database.DB +} + +func (mig *AddCurrentSequencesIndex) Execute(ctx context.Context) error { + _, err := mig.dbClient.ExecContext(ctx, addCurrentSequencesIndex) + return err +} + +func (mig *AddCurrentSequencesIndex) String() string { + return "19_add_current_sequences_index" +} diff --git a/cmd/setup/19.sql b/cmd/setup/19.sql new file mode 100644 index 0000000000..0d690c9552 --- /dev/null +++ b/cmd/setup/19.sql @@ -0,0 +1 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS events2_current_sequence ON eventstore.events2 ("sequence" DESC, aggregate_id, aggregate_type, instance_id); \ No newline at end of file diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 92ca30c3b1..84e7903dcf 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -76,6 +76,7 @@ type Steps struct { s16UniqueConstraintsLower *UniqueConstraintToLower s17AddOffsetToUniqueConstraints *AddOffsetToCurrentStates s18AddLowerFieldsToLoginNames *AddLowerFieldsToLoginNames + s19AddCurrentStatesIndex *AddCurrentSequencesIndex } type encryptionKeyConfig struct { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 476068fea3..4fc3f64481 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -109,6 +109,7 @@ func Setup(config *Config, steps *Steps, masterKey string) { steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: queryDBClient} steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: queryDBClient} steps.s18AddLowerFieldsToLoginNames = &AddLowerFieldsToLoginNames{dbClient: queryDBClient} + steps.s19AddCurrentStatesIndex = &AddCurrentSequencesIndex{dbClient: queryDBClient} err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -153,6 +154,8 @@ func Setup(config *Config, steps *Steps, masterKey string) { logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed") err = migration.Migrate(ctx, eventstoreClient, steps.s17AddOffsetToUniqueConstraints) logging.WithFields("name", steps.s17AddOffsetToUniqueConstraints.String()).OnError(err).Fatal("migration failed") + err = migration.Migrate(ctx, eventstoreClient, steps.s19AddCurrentStatesIndex) + logging.WithFields("name", steps.s19AddCurrentStatesIndex.String()).OnError(err).Fatal("migration failed") for _, repeatableStep := range repeatableSteps { err = migration.Migrate(ctx, eventstoreClient, repeatableStep) diff --git a/internal/eventstore/v3/sequence.go b/internal/eventstore/v3/sequence.go index 994645eab2..8d84ef4755 100644 --- a/internal/eventstore/v3/sequence.go +++ b/internal/eventstore/v3/sequence.go @@ -26,7 +26,7 @@ func latestSequences(ctx context.Context, tx *sql.Tx, commands []eventstore.Comm sequences := commandsToSequences(ctx, commands) conditions, args := sequencesToSql(sequences) - rows, err := tx.QueryContext(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " OR ")), args...) + rows, err := tx.QueryContext(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " UNION ALL ")), args...) if err != nil { return nil, zerrors.ThrowInternal(err, "V3-5jU5z", "Errors.Internal") } @@ -92,7 +92,7 @@ func sequencesToSql(sequences []*latestSequence) (conditions []string, args []an conditions = make([]string, len(sequences)) for i, sequence := range sequences { - conditions[i] = fmt.Sprintf("(instance_id = $%d AND aggregate_type = $%d AND aggregate_id = $%d)", + conditions[i] = fmt.Sprintf(`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $%d AND aggregate_type = $%d AND aggregate_id = $%d ORDER BY "sequence" DESC LIMIT 1)`, i*argsPerCondition+1, i*argsPerCondition+2, i*argsPerCondition+3, diff --git a/internal/eventstore/v3/sequence_test.go b/internal/eventstore/v3/sequence_test.go index 55ee73831d..d755c0dbd2 100644 --- a/internal/eventstore/v3/sequence_test.go +++ b/internal/eventstore/v3/sequence_test.go @@ -247,7 +247,7 @@ func Test_sequencesToSql(t *testing.T) { }, }, wantConditions: []string{ - "(instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3)", + `(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3 ORDER BY "sequence" DESC LIMIT 1)`, }, wantArgs: []any{ "instance", @@ -266,8 +266,8 @@ func Test_sequencesToSql(t *testing.T) { }, }, wantConditions: []string{ - "(instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3)", - "(instance_id = $4 AND aggregate_type = $5 AND aggregate_id = $6)", + `(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND aggregate_id = $3 ORDER BY "sequence" DESC LIMIT 1)`, + `(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = $4 AND aggregate_type = $5 AND aggregate_id = $6 ORDER BY "sequence" DESC LIMIT 1)`, }, wantArgs: []any{ "instance", diff --git a/internal/eventstore/v3/sequences_query.sql b/internal/eventstore/v3/sequences_query.sql index fb164013dc..468a275253 100644 --- a/internal/eventstore/v3/sequences_query.sql +++ b/internal/eventstore/v3/sequences_query.sql @@ -1,17 +1,5 @@ -with existing as ( - SELECT - instance_id - , aggregate_type - , aggregate_id - , MAX("sequence") "sequence" - FROM - eventstore.events2 existing - WHERE - %s - GROUP BY - instance_id - , aggregate_type - , aggregate_id +WITH existing AS ( + %s ) SELECT e.instance_id , e.owner @@ -23,8 +11,8 @@ FROM JOIN existing ON - e.instance_id = existing.instance_id - AND e.aggregate_type = existing.aggregate_type - AND e.aggregate_id = existing.aggregate_id - AND e.sequence = existing.sequence + e.instance_id = existing.instance_id + AND e.aggregate_type = existing.aggregate_type + AND e.aggregate_id = existing.aggregate_id + AND e.sequence = existing.sequence FOR UPDATE; \ No newline at end of file