mirror of
https://github.com/grafana/grafana.git
synced 2025-02-16 18:34:52 -06:00
UnifiedStorage: ensure list from history works across all 3 database backends (#91134)
* Ensure List is sorted by resource version DESC * Ensure List is sorted by resource version DESC * fix list in the past for postgres
This commit is contained in:
parent
62f67e38b8
commit
7ad37ff4e4
@ -1,9 +1,9 @@
|
||||
SELECT
|
||||
kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }},
|
||||
{{ .Ident "value" | .Into .Response.Value }}
|
||||
kv.{{ .Ident "value" | .Into .Response.Value }}
|
||||
FROM {{ .Ident "resource_history" }} as kv
|
||||
JOIN (
|
||||
SELECT {{ .Ident "guid" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }}
|
||||
INNER JOIN (
|
||||
SELECT {{ .Ident "namespace" }}, {{ .Ident "group" }}, {{ .Ident "resource" }}, {{ .Ident "name" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }}
|
||||
FROM {{ .Ident "resource_history" }} AS mkv
|
||||
WHERE 1 = 1
|
||||
AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }}
|
||||
@ -23,10 +23,29 @@ SELECT
|
||||
{{ end }}
|
||||
GROUP BY mkv.{{ .Ident "namespace" }}, mkv.{{ .Ident "group" }}, mkv.{{ .Ident "resource" }}, mkv.{{ .Ident "name" }}
|
||||
) AS maxkv
|
||||
ON maxkv.{{ .Ident "guid" }} = kv.{{ .Ident "guid" }}
|
||||
ON
|
||||
maxkv.{{ .Ident "resource_version" }} = kv.{{ .Ident "resource_version" }}
|
||||
AND maxkv.{{ .Ident "namespace" }} = kv.{{ .Ident "namespace" }}
|
||||
AND maxkv.{{ .Ident "group" }} = kv.{{ .Ident "group" }}
|
||||
AND maxkv.{{ .Ident "resource" }} = kv.{{ .Ident "resource" }}
|
||||
AND maxkv.{{ .Ident "name" }} = kv.{{ .Ident "name" }}
|
||||
WHERE kv.{{ .Ident "action" }} != 3
|
||||
ORDER BY kv.{{ .Ident "resource_version" }} ASC
|
||||
{{ if and .Request.Options .Request.Options.Key }}
|
||||
{{ if .Request.Options.Key.Namespace }}
|
||||
AND kv.{{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }}
|
||||
{{ end }}
|
||||
{{ if .Request.Options.Key.Group }}
|
||||
AND kv.{{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }}
|
||||
{{ end }}
|
||||
{{ if .Request.Options.Key.Resource }}
|
||||
AND kv.{{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }}
|
||||
{{ end }}
|
||||
{{ if .Request.Options.Key.Name }}
|
||||
AND kv.{{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }}
|
||||
{{ end }}
|
||||
{{ end }}
|
||||
ORDER BY kv.{{ .Ident "resource_version" }} DESC
|
||||
{{ if (gt .Request.Limit 0) }}
|
||||
LIMIT {{ .Arg .Request.Offset }}, {{ .Arg .Request.Limit }}
|
||||
LIMIT {{ .Arg .Request.Limit }} OFFSET {{ .Arg .Request.Offset }}
|
||||
{{ end }}
|
||||
;
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
"google.golang.org/grpc"
|
||||
@ -51,47 +50,48 @@ func newServer(t *testing.T) sql.Backend {
|
||||
return ret
|
||||
}
|
||||
|
||||
func TestBackendHappyPath(t *testing.T) {
|
||||
// TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8
|
||||
t.Skip("test is breaking enterprise builds")
|
||||
func TestIntegrationBackendHappyPath(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
store := newServer(t)
|
||||
|
||||
stream, err := store.WatchWriteEvents(ctx)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Add 3 resources", func(t *testing.T) {
|
||||
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), rv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), rv)
|
||||
|
||||
rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(2), rv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(2), rv)
|
||||
|
||||
rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(3), rv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(3), rv)
|
||||
})
|
||||
|
||||
t.Run("Update item2", func(t *testing.T) {
|
||||
rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(4), rv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(4), rv)
|
||||
})
|
||||
|
||||
t.Run("Delete item1", func(t *testing.T) {
|
||||
rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(5), rv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(5), rv)
|
||||
})
|
||||
|
||||
t.Run("Read latest item 2", func(t *testing.T) {
|
||||
resp, err := store.Read(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(4), resp.ResourceVersion)
|
||||
assert.Equal(t, "item2 MODIFIED", string(resp.Value))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(4), resp.ResourceVersion)
|
||||
require.Equal(t, "item2 MODIFIED", string(resp.Value))
|
||||
})
|
||||
|
||||
t.Run("Read early verion of item2", func(t *testing.T) {
|
||||
@ -99,9 +99,9 @@ func TestBackendHappyPath(t *testing.T) {
|
||||
Key: resourceKey("item2"),
|
||||
ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(2), resp.ResourceVersion)
|
||||
assert.Equal(t, "item2 ADDED", string(resp.Value))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(2), resp.ResourceVersion)
|
||||
require.Equal(t, "item2 ADDED", string(resp.Value))
|
||||
})
|
||||
|
||||
t.Run("PrepareList latest", func(t *testing.T) {
|
||||
@ -114,63 +114,67 @@ func TestBackendHappyPath(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, resp.Items, 2)
|
||||
assert.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value))
|
||||
assert.Equal(t, "item3 ADDED", string(resp.Items[1].Value))
|
||||
assert.Equal(t, int64(5), resp.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, resp.Items, 2)
|
||||
require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value))
|
||||
require.Equal(t, "item3 ADDED", string(resp.Items[1].Value))
|
||||
require.Equal(t, int64(5), resp.ResourceVersion)
|
||||
})
|
||||
|
||||
t.Run("Watch events", func(t *testing.T) {
|
||||
event := <-stream
|
||||
assert.Equal(t, "item1", event.Key.Name)
|
||||
assert.Equal(t, int64(1), event.ResourceVersion)
|
||||
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
require.Equal(t, "item1", event.Key.Name)
|
||||
require.Equal(t, int64(1), event.ResourceVersion)
|
||||
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
event = <-stream
|
||||
assert.Equal(t, "item2", event.Key.Name)
|
||||
assert.Equal(t, int64(2), event.ResourceVersion)
|
||||
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
require.Equal(t, "item2", event.Key.Name)
|
||||
require.Equal(t, int64(2), event.ResourceVersion)
|
||||
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
|
||||
event = <-stream
|
||||
assert.Equal(t, "item3", event.Key.Name)
|
||||
assert.Equal(t, int64(3), event.ResourceVersion)
|
||||
assert.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
require.Equal(t, "item3", event.Key.Name)
|
||||
require.Equal(t, int64(3), event.ResourceVersion)
|
||||
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
||||
|
||||
event = <-stream
|
||||
assert.Equal(t, "item2", event.Key.Name)
|
||||
assert.Equal(t, int64(4), event.ResourceVersion)
|
||||
assert.Equal(t, resource.WatchEvent_MODIFIED, event.Type)
|
||||
require.Equal(t, "item2", event.Key.Name)
|
||||
require.Equal(t, int64(4), event.ResourceVersion)
|
||||
require.Equal(t, resource.WatchEvent_MODIFIED, event.Type)
|
||||
|
||||
event = <-stream
|
||||
assert.Equal(t, "item1", event.Key.Name)
|
||||
assert.Equal(t, int64(5), event.ResourceVersion)
|
||||
assert.Equal(t, resource.WatchEvent_DELETED, event.Type)
|
||||
require.Equal(t, "item1", event.Key.Name)
|
||||
require.Equal(t, int64(5), event.ResourceVersion)
|
||||
require.Equal(t, resource.WatchEvent_DELETED, event.Type)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBackendWatchWriteEventsFromLastest(t *testing.T) {
|
||||
// TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8
|
||||
t.Skip("test is breaking enterprise builds")
|
||||
func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
store := newServer(t)
|
||||
|
||||
// Create a few resources before initing the watch
|
||||
_, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start the watch
|
||||
stream, err := store.WatchWriteEvents(ctx)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create one more event
|
||||
_, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "item2", (<-stream).Key.Name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "item2", (<-stream).Key.Name)
|
||||
}
|
||||
|
||||
func TestBackendPrepareList(t *testing.T) {
|
||||
// TODO: stop this from breaking enterprise builds https://drone.grafana.net/grafana/grafana-enterprise/73536/2/8
|
||||
t.Skip("test is breaking enterprise builds")
|
||||
func TestIntegrationBackendPrepareList(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
store := newServer(t)
|
||||
|
||||
@ -192,9 +196,16 @@ func TestBackendPrepareList(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.Items, 5)
|
||||
assert.Empty(t, res.NextPageToken)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Items, 5)
|
||||
// should be sorted by resource version DESC
|
||||
require.Equal(t, "item6 ADDED", string(res.Items[0].Value))
|
||||
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
||||
require.Equal(t, "item5 ADDED", string(res.Items[2].Value))
|
||||
require.Equal(t, "item4 ADDED", string(res.Items[3].Value))
|
||||
require.Equal(t, "item1 ADDED", string(res.Items[4].Value))
|
||||
|
||||
require.Empty(t, res.NextPageToken)
|
||||
})
|
||||
|
||||
t.Run("list latest first page ", func(t *testing.T) {
|
||||
@ -207,12 +218,15 @@ func TestBackendPrepareList(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.Items, 3)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Items, 3)
|
||||
continueToken, err := sql.GetContinueToken(res.NextPageToken)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(8), continueToken.ResourceVersion)
|
||||
assert.Equal(t, int64(3), continueToken.StartOffset)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "item6 ADDED", string(res.Items[0].Value))
|
||||
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
||||
require.Equal(t, "item5 ADDED", string(res.Items[2].Value))
|
||||
require.Equal(t, int64(8), continueToken.ResourceVersion)
|
||||
require.Equal(t, int64(3), continueToken.StartOffset)
|
||||
})
|
||||
|
||||
t.Run("list at revision", func(t *testing.T) {
|
||||
@ -225,13 +239,13 @@ func TestBackendPrepareList(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.Items, 4)
|
||||
assert.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
||||
assert.Equal(t, "item2 ADDED", string(res.Items[1].Value))
|
||||
assert.Equal(t, "item3 ADDED", string(res.Items[2].Value))
|
||||
assert.Equal(t, "item4 ADDED", string(res.Items[3].Value))
|
||||
assert.Empty(t, res.NextPageToken)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Items, 4)
|
||||
require.Equal(t, "item4 ADDED", string(res.Items[0].Value))
|
||||
require.Equal(t, "item3 ADDED", string(res.Items[1].Value))
|
||||
require.Equal(t, "item2 ADDED", string(res.Items[2].Value))
|
||||
require.Equal(t, "item1 ADDED", string(res.Items[3].Value))
|
||||
require.Empty(t, res.NextPageToken)
|
||||
})
|
||||
|
||||
t.Run("fetch first page at revision with limit", func(t *testing.T) {
|
||||
@ -245,16 +259,17 @@ func TestBackendPrepareList(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.Items, 3)
|
||||
assert.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
||||
assert.Equal(t, "item4 ADDED", string(res.Items[1].Value))
|
||||
assert.Equal(t, "item5 ADDED", string(res.Items[2].Value))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Items, 3)
|
||||
t.Log(res.Items)
|
||||
require.Equal(t, "item2 MODIFIED", string(res.Items[0].Value))
|
||||
require.Equal(t, "item5 ADDED", string(res.Items[1].Value))
|
||||
require.Equal(t, "item4 ADDED", string(res.Items[2].Value))
|
||||
|
||||
continueToken, err := sql.GetContinueToken(res.NextPageToken)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(7), continueToken.ResourceVersion)
|
||||
assert.Equal(t, int64(3), continueToken.StartOffset)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(7), continueToken.ResourceVersion)
|
||||
require.Equal(t, int64(3), continueToken.StartOffset)
|
||||
})
|
||||
|
||||
t.Run("fetch second page at revision", func(t *testing.T) {
|
||||
@ -272,15 +287,15 @@ func TestBackendPrepareList(t *testing.T) {
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.Items, 2)
|
||||
assert.Equal(t, "item5 ADDED", string(res.Items[0].Value))
|
||||
assert.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Items, 2)
|
||||
require.Equal(t, "item5 ADDED", string(res.Items[0].Value))
|
||||
require.Equal(t, "item4 ADDED", string(res.Items[1].Value))
|
||||
|
||||
continueToken, err = sql.GetContinueToken(res.NextPageToken)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(8), continueToken.ResourceVersion)
|
||||
assert.Equal(t, int64(4), continueToken.StartOffset)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(8), continueToken.ResourceVersion)
|
||||
require.Equal(t, int64(4), continueToken.StartOffset)
|
||||
})
|
||||
}
|
||||
func TestClientServer(t *testing.T) {
|
||||
|
@ -1,12 +1,13 @@
|
||||
SELECT kv."resource_version", "value"
|
||||
SELECT kv."resource_version", kv."value"
|
||||
FROM "resource_history" as kv
|
||||
JOIN (
|
||||
SELECT "guid", max("resource_version") AS "resource_version"
|
||||
INNER JOIN (
|
||||
SELECT "namespace", "group", "resource", "name", max("resource_version") AS "resource_version"
|
||||
FROM "resource_history" AS mkv
|
||||
WHERE 1 = 1 AND "resource_version" <= ? AND "namespace" = ?
|
||||
GROUP BY mkv."namespace", mkv."group", mkv."resource", mkv."name"
|
||||
) AS maxkv ON maxkv."guid" = kv."guid"
|
||||
WHERE kv."action" != 3
|
||||
ORDER BY kv."resource_version" ASC
|
||||
LIMIT ?, ?
|
||||
) AS maxkv
|
||||
ON maxkv."resource_version" = kv."resource_version" AND maxkv."namespace" = kv."namespace" AND maxkv."group" = kv."group" AND maxkv."resource" = kv."resource" AND maxkv."name" = kv."name"
|
||||
WHERE kv."action" != 3 AND kv."namespace" = ?
|
||||
ORDER BY kv."resource_version" DESC
|
||||
LIMIT ? OFFSET ?
|
||||
;
|
||||
|
Loading…
Reference in New Issue
Block a user