From 7ad37ff4e43ed200de1b8122bb910c6251923c9a Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Mon, 29 Jul 2024 21:49:54 +0200 Subject: [PATCH] UnifiedStorage: ensure list from history works across all 3 database backends (#91134) * Ensure List is sorted by resource version DESC * Ensure List is sorted by resource version DESC * fix list in the past for postgres --- .../sql/data/resource_history_list.sql | 31 ++- .../unified/sql/test/integration_test.go | 177 ++++++++++-------- .../resource_history_list_mysql_sqlite.sql | 15 +- 3 files changed, 129 insertions(+), 94 deletions(-) diff --git a/pkg/storage/unified/sql/data/resource_history_list.sql b/pkg/storage/unified/sql/data/resource_history_list.sql index 9974863a5bb..ec15b144809 100644 --- a/pkg/storage/unified/sql/data/resource_history_list.sql +++ b/pkg/storage/unified/sql/data/resource_history_list.sql @@ -1,9 +1,9 @@ SELECT kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, - {{ .Ident "value" | .Into .Response.Value }} + kv.{{ .Ident "value" | .Into .Response.Value }} FROM {{ .Ident "resource_history" }} as kv - JOIN ( - SELECT {{ .Ident "guid" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }} + INNER JOIN ( + SELECT {{ .Ident "namespace" }}, {{ .Ident "group" }}, {{ .Ident "resource" }}, {{ .Ident "name" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }} FROM {{ .Ident "resource_history" }} AS mkv WHERE 1 = 1 AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }} @@ -23,10 +23,29 @@ SELECT {{ end }} GROUP BY mkv.{{ .Ident "namespace" }}, mkv.{{ .Ident "group" }}, mkv.{{ .Ident "resource" }}, mkv.{{ .Ident "name" }} ) AS maxkv - ON maxkv.{{ .Ident "guid" }} = kv.{{ .Ident "guid" }} + ON + maxkv.{{ .Ident "resource_version" }} = kv.{{ .Ident "resource_version" }} + AND maxkv.{{ .Ident "namespace" }} = kv.{{ .Ident "namespace" }} + AND maxkv.{{ .Ident "group" }} = kv.{{ .Ident "group" }} + AND maxkv.{{ .Ident "resource" }} = kv.{{ .Ident "resource" }} + AND maxkv.{{ .Ident "name" }} = kv.{{ .Ident "name" }} WHERE kv.{{ .Ident "action" }} != 3 - ORDER BY kv.{{ .Ident "resource_version" }} ASC + {{ if and .Request.Options .Request.Options.Key }} + {{ if .Request.Options.Key.Namespace }} + AND kv.{{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }} + {{ end }} + {{ if .Request.Options.Key.Group }} + AND kv.{{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }} + {{ end }} + {{ if .Request.Options.Key.Resource }} + AND kv.{{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }} + {{ end }} + {{ if .Request.Options.Key.Name }} + AND kv.{{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }} + {{ end }} + {{ end }} + ORDER BY kv.{{ .Ident "resource_version" }} DESC {{ if (gt .Request.Limit 0) }} - LIMIT {{ .Arg .Request.Offset }}, {{ .Arg .Request.Limit }} + LIMIT {{ .Arg .Request.Limit }} OFFSET {{ .Arg .Request.Offset }} {{ end }} ; diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index e153bc530ea..fd5500fe4a3 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/grpc" @@ -51,47 +50,48 @@ func newServer(t *testing.T) sql.Backend { return ret } -func TestBackendHappyPath(t *testing.T) { - // TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8 - t.Skip("test is breaking enterprise builds") +func TestIntegrationBackendHappyPath(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } ctx := testutil.NewDefaultTestContext(t) store := newServer(t) stream, err := store.WatchWriteEvents(ctx) - assert.NoError(t, err) + require.NoError(t, err) t.Run("Add 3 resources", func(t *testing.T) { rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) - assert.NoError(t, err) - assert.Equal(t, int64(1), rv) + require.NoError(t, err) + require.Equal(t, int64(1), rv) rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) - assert.NoError(t, err) - assert.Equal(t, int64(2), rv) + require.NoError(t, err) + require.Equal(t, int64(2), rv) rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) - assert.NoError(t, err) - assert.Equal(t, int64(3), rv) + require.NoError(t, err) + require.Equal(t, int64(3), rv) }) t.Run("Update item2", func(t *testing.T) { rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) - assert.NoError(t, err) - assert.Equal(t, int64(4), rv) + require.NoError(t, err) + require.Equal(t, int64(4), rv) }) t.Run("Delete item1", func(t *testing.T) { rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED) - assert.NoError(t, err) - assert.Equal(t, int64(5), rv) + require.NoError(t, err) + require.Equal(t, int64(5), rv) }) t.Run("Read latest item 2", func(t *testing.T) { resp, err := store.Read(ctx, &resource.ReadRequest{Key: resourceKey("item2")}) - assert.NoError(t, err) - assert.Equal(t, int64(4), resp.ResourceVersion) - assert.Equal(t, "item2 MODIFIED", string(resp.Value)) + require.NoError(t, err) + require.Equal(t, int64(4), resp.ResourceVersion) + require.Equal(t, "item2 MODIFIED", string(resp.Value)) }) t.Run("Read early verion of item2", func(t *testing.T) { @@ -99,9 +99,9 @@ func TestBackendHappyPath(t *testing.T) { Key: resourceKey("item2"), ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4 }) - assert.NoError(t, err) - assert.Equal(t, int64(2), resp.ResourceVersion) - assert.Equal(t, "item2 ADDED", string(resp.Value)) + require.NoError(t, err) + require.Equal(t, int64(2), resp.ResourceVersion) + require.Equal(t, "item2 ADDED", string(resp.Value)) }) t.Run("PrepareList latest", func(t *testing.T) { @@ -114,63 +114,67 @@ func TestBackendHappyPath(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, resp.Items, 2) - assert.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value)) - assert.Equal(t, "item3 ADDED", string(resp.Items[1].Value)) - assert.Equal(t, int64(5), resp.ResourceVersion) + require.NoError(t, err) + require.Len(t, resp.Items, 2) + require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value)) + require.Equal(t, "item3 ADDED", string(resp.Items[1].Value)) + require.Equal(t, int64(5), resp.ResourceVersion) }) t.Run("Watch events", func(t *testing.T) { event := <-stream - assert.Equal(t, "item1", event.Key.Name) - assert.Equal(t, int64(1), event.ResourceVersion) - assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + require.Equal(t, "item1", event.Key.Name) + require.Equal(t, int64(1), event.ResourceVersion) + require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream - assert.Equal(t, "item2", event.Key.Name) - assert.Equal(t, int64(2), event.ResourceVersion) - assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + require.Equal(t, "item2", event.Key.Name) + require.Equal(t, int64(2), event.ResourceVersion) + require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream - assert.Equal(t, "item3", event.Key.Name) - assert.Equal(t, int64(3), event.ResourceVersion) - assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + require.Equal(t, "item3", event.Key.Name) + require.Equal(t, int64(3), event.ResourceVersion) + require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream - assert.Equal(t, "item2", event.Key.Name) - assert.Equal(t, int64(4), event.ResourceVersion) - assert.Equal(t, resource.WatchEvent_MODIFIED, event.Type) + require.Equal(t, "item2", event.Key.Name) + require.Equal(t, int64(4), event.ResourceVersion) + require.Equal(t, resource.WatchEvent_MODIFIED, event.Type) event = <-stream - assert.Equal(t, "item1", event.Key.Name) - assert.Equal(t, int64(5), event.ResourceVersion) - assert.Equal(t, resource.WatchEvent_DELETED, event.Type) + require.Equal(t, "item1", event.Key.Name) + require.Equal(t, int64(5), event.ResourceVersion) + require.Equal(t, resource.WatchEvent_DELETED, event.Type) }) } -func TestBackendWatchWriteEventsFromLastest(t *testing.T) { - // TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8 - t.Skip("test is breaking enterprise builds") +func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := testutil.NewDefaultTestContext(t) store := newServer(t) // Create a few resources before initing the watch _, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) - assert.NoError(t, err) + require.NoError(t, err) // Start the watch stream, err := store.WatchWriteEvents(ctx) - assert.NoError(t, err) + require.NoError(t, err) // Create one more event _, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) - assert.NoError(t, err) - assert.Equal(t, "item2", (<-stream).Key.Name) + require.NoError(t, err) + require.Equal(t, "item2", (<-stream).Key.Name) } -func TestBackendPrepareList(t *testing.T) { - // TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8 - t.Skip("test is breaking enterprise builds") +func TestIntegrationBackendPrepareList(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := testutil.NewDefaultTestContext(t) store := newServer(t) @@ -192,9 +196,16 @@ func TestBackendPrepareList(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, res.Items, 5) - assert.Empty(t, res.NextPageToken) + require.NoError(t, err) + require.Len(t, res.Items, 5) + // should be sorted by resource version DESC + require.Equal(t, "item6 ADDED", string(res.Items[0].Value)) + require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) + require.Equal(t, "item5 ADDED", string(res.Items[2].Value)) + require.Equal(t, "item4 ADDED", string(res.Items[3].Value)) + require.Equal(t, "item1 ADDED", string(res.Items[4].Value)) + + require.Empty(t, res.NextPageToken) }) t.Run("list latest first page ", func(t *testing.T) { @@ -207,12 +218,15 @@ func TestBackendPrepareList(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, res.Items, 3) + require.NoError(t, err) + require.Len(t, res.Items, 3) continueToken, err := sql.GetContinueToken(res.NextPageToken) - assert.NoError(t, err) - assert.Equal(t, int64(8), continueToken.ResourceVersion) - assert.Equal(t, int64(3), continueToken.StartOffset) + require.NoError(t, err) + require.Equal(t, "item6 ADDED", string(res.Items[0].Value)) + require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) + require.Equal(t, "item5 ADDED", string(res.Items[2].Value)) + require.Equal(t, int64(8), continueToken.ResourceVersion) + require.Equal(t, int64(3), continueToken.StartOffset) }) t.Run("list at revision", func(t *testing.T) { @@ -225,13 +239,13 @@ func TestBackendPrepareList(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, res.Items, 4) - assert.Equal(t, "item1 ADDED", string(res.Items[0].Value)) - assert.Equal(t, "item2 ADDED", string(res.Items[1].Value)) - assert.Equal(t, "item3 ADDED", string(res.Items[2].Value)) - assert.Equal(t, "item4 ADDED", string(res.Items[3].Value)) - assert.Empty(t, res.NextPageToken) + require.NoError(t, err) + require.Len(t, res.Items, 4) + require.Equal(t, "item4 ADDED", string(res.Items[0].Value)) + require.Equal(t, "item3 ADDED", string(res.Items[1].Value)) + require.Equal(t, "item2 ADDED", string(res.Items[2].Value)) + require.Equal(t, "item1 ADDED", string(res.Items[3].Value)) + require.Empty(t, res.NextPageToken) }) t.Run("fetch first page at revision with limit", func(t *testing.T) { @@ -245,16 +259,17 @@ func TestBackendPrepareList(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, res.Items, 3) - assert.Equal(t, "item1 ADDED", string(res.Items[0].Value)) - assert.Equal(t, "item4 ADDED", string(res.Items[1].Value)) - assert.Equal(t, "item5 ADDED", string(res.Items[2].Value)) + require.NoError(t, err) + require.Len(t, res.Items, 3) + t.Log(res.Items) + require.Equal(t, "item2 MODIFIED", string(res.Items[0].Value)) + require.Equal(t, "item5 ADDED", string(res.Items[1].Value)) + require.Equal(t, "item4 ADDED", string(res.Items[2].Value)) continueToken, err := sql.GetContinueToken(res.NextPageToken) - assert.NoError(t, err) - assert.Equal(t, int64(7), continueToken.ResourceVersion) - assert.Equal(t, int64(3), continueToken.StartOffset) + require.NoError(t, err) + require.Equal(t, int64(7), continueToken.ResourceVersion) + require.Equal(t, int64(3), continueToken.StartOffset) }) t.Run("fetch second page at revision", func(t *testing.T) { @@ -272,15 +287,15 @@ func TestBackendPrepareList(t *testing.T) { }, }, }) - assert.NoError(t, err) - assert.Len(t, res.Items, 2) - assert.Equal(t, "item5 ADDED", string(res.Items[0].Value)) - assert.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) + require.NoError(t, err) + require.Len(t, res.Items, 2) + require.Equal(t, "item5 ADDED", string(res.Items[0].Value)) + require.Equal(t, "item4 ADDED", string(res.Items[1].Value)) continueToken, err = sql.GetContinueToken(res.NextPageToken) - assert.NoError(t, err) - assert.Equal(t, int64(8), continueToken.ResourceVersion) - assert.Equal(t, int64(4), continueToken.StartOffset) + require.NoError(t, err) + require.Equal(t, int64(8), continueToken.ResourceVersion) + require.Equal(t, int64(4), continueToken.StartOffset) }) } func TestClientServer(t *testing.T) { diff --git a/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql index f8fcb9535b4..893e78a7df7 100644 --- a/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql +++ b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql @@ -1,12 +1,13 @@ -SELECT kv."resource_version", "value" +SELECT kv."resource_version", kv."value" FROM "resource_history" as kv -JOIN ( - SELECT "guid", max("resource_version") AS "resource_version" +INNER JOIN ( + SELECT "namespace", "group", "resource", "name", max("resource_version") AS "resource_version" FROM "resource_history" AS mkv WHERE 1 = 1 AND "resource_version" <= ? AND "namespace" = ? GROUP BY mkv."namespace", mkv."group", mkv."resource", mkv."name" -) AS maxkv ON maxkv."guid" = kv."guid" -WHERE kv."action" != 3 -ORDER BY kv."resource_version" ASC -LIMIT ?, ? +) AS maxkv +ON maxkv."resource_version" = kv."resource_version" AND maxkv."namespace" = kv."namespace" AND maxkv."group" = kv."group" AND maxkv."resource" = kv."resource" AND maxkv."name" = kv."name" +WHERE kv."action" != 3 AND kv."namespace" = ? +ORDER BY kv."resource_version" DESC +LIMIT ? OFFSET ? ;