diff --git a/go.mod b/go.mod index 3f070d31c07..c2fb95f1d94 100644 --- a/go.mod +++ b/go.mod @@ -123,7 +123,7 @@ require ( gopkg.in/mail.v2 v2.3.1 // @grafana/backend-platform gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // @grafana/alerting-squad-backend - xorm.io/builder v0.3.6 // @grafana/backend-platform + xorm.io/builder v0.3.6 // indirect; @grafana/backend-platform xorm.io/core v0.7.3 // @grafana/backend-platform xorm.io/xorm v0.8.2 // @grafana/alerting-squad-backend ) @@ -175,7 +175,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/go-multierror v1.1.1 // @grafana/alerting-squad + github.com/hashicorp/go-multierror v1.1.1 // indirect; @grafana/alerting-squad github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect @@ -299,12 +299,13 @@ require ( github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/agext/levenshtein v1.2.1 // indirect github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect - github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect + github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/bmatcuk/doublestar v1.1.1 // indirect github.com/buildkite/yaml v2.1.0+incompatible // indirect + github.com/bwmarrin/snowflake v0.3.0 // @grafan/grafana-app-platform-squad github.com/centrifugal/protocol v0.10.0 // indirect github.com/cloudflare/circl v1.3.3 // indirect github.com/cockroachdb/errors v1.9.1 // indirect @@ -335,8 +336,7 @@ require ( github.com/gogo/googleapis v1.4.1 // indirect github.com/gogo/status v1.1.1 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect - github.com/google/cel-go v0.12.6 // indirect - github.com/google/gnostic v0.6.9 // indirect + github.com/google/cel-go v0.16.1 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/s2a-go v0.1.7 // indirect @@ -396,20 +396,20 @@ require ( github.com/yuin/gopher-lua v1.1.0 // indirect github.com/zclconf/go-cty v1.13.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.etcd.io/etcd/api/v3 v3.5.7 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect - go.etcd.io/etcd/client/v3 v3.5.7 // indirect + go.etcd.io/etcd/api/v3 v3.5.9 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect + go.etcd.io/etcd/client/v3 v3.5.9 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.starlark.net v0.0.0-20221020143700-22309ac47eac // indirect - go.uber.org/multierr v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/term v0.13.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect k8s.io/api v0.27.1 // indirect k8s.io/kms v0.27.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect @@ -425,7 +425,7 @@ require ( sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect + sigs.k8s.io/yaml v1.3.0 // @grafana-app-platform-squad ) require ( @@ -450,7 +450,7 @@ require ( github.com/blugelabs/ice v1.0.0 // indirect github.com/caio/go-tdigest v3.1.0+incompatible // indirect github.com/chromedp/cdproto v0.0.0-20220208224320-6efb837e6bc2 // indirect - github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-semver v0.3.1 // indirect github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect github.com/docker/docker v23.0.4+incompatible // @grafana/grafana-delivery github.com/elazarl/goproxy v0.0.0-20230731152917-f99041a5c027 // indirect @@ -482,6 +482,8 @@ require ( gopkg.in/warnings.v0 v0.1.2 // indirect ) +require github.com/google/gnostic v0.6.9 // indirect + // Use fork of crewjam/saml with fixes for some issues until changes get merged into upstream replace github.com/crewjam/saml => github.com/grafana/saml v0.4.15-0.20231025143828-a6c0e9b86a4c diff --git a/go.sum b/go.sum index bb09dc0c15c..37493c11f44 100644 --- a/go.sum +++ b/go.sum @@ -699,8 +699,8 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= -github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= +github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= +github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= github.com/apache/arrow/go/arrow v0.0.0-20210223225224-5bea62493d91/go.mod h1:c9sxoIT3YgLxH4UhLOCKaBlEojuMhVYpk4Ntv3opUTQ= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= @@ -844,6 +844,8 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2 github.com/buildkite/yaml v2.1.0+incompatible h1:xirI+ql5GzfikVNDmt+yeiXpf/v1Gt03qXTtT5WXdr8= github.com/buildkite/yaml v2.1.0+incompatible/go.mod h1:UoU8vbcwu1+vjZq01+KrpSeLBgQQIjL/H7Y6KwikUrI= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= +github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= +github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds= github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= @@ -927,8 +929,9 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -1667,8 +1670,8 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/cel-go v0.12.6 h1:kjeKudqV0OygrAqA9fX6J55S8gj+Jre2tckIm5RoG4M= -github.com/google/cel-go v0.12.6/go.mod h1:Jk7ljRzLBhkmiAwBoUxB1sZSCVBAzkqPF25olK/iRDw= +github.com/google/cel-go v0.16.1 h1:3hZfSNiAU3KOiNtxuFXVp5WFy4hf/Ly3Sa4/7F8SXNo= +github.com/google/cel-go v0.16.1/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA= @@ -2946,19 +2949,19 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= -go.etcd.io/etcd/api/v3 v3.5.7 h1:sbcmosSVesNrWOJ58ZQFitHMdncusIifYcrBfwrlJSY= -go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA= +go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs= +go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg= -go.etcd.io/etcd/client/pkg/v3 v3.5.7/go.mod h1:o0Abi1MK86iad3YrWhgUsbGx1pmTS+hrORWc2CamuhY= +go.etcd.io/etcd/client/pkg/v3 v3.5.9 h1:oidDC4+YEuSIQbsR94rY9gur91UPL6DnxDCIYd2IGsE= +go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+DHwTGEbU= go.etcd.io/etcd/client/v2 v2.305.7 h1:AELPkjNR3/igjbO7CjyF1fPuVPjrblliiKj+Y6xSGOU= go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= -go.etcd.io/etcd/client/v3 v3.5.7 h1:u/OhpiuCgYY8awOHlhIhmGIGpxfBU/GZBUP3m/3/Iz4= -go.etcd.io/etcd/client/v3 v3.5.7/go.mod h1:sOWmj9DZUMyAngS7QQwCyAXXAL6WhgTOPLNS/NabQgw= +go.etcd.io/etcd/client/v3 v3.5.9 h1:r5xghnU7CwbUxD/fbUtRyJGaYNfDun8sp/gTr1hew6E= +go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQaUr2MbA= go.etcd.io/etcd/pkg/v3 v3.5.7 h1:obOzeVwerFwZ9trMWapU/VjDcYUJb5OfgC1zqEGWO/0= go.etcd.io/etcd/raft/v3 v3.5.7 h1:aN79qxLmV3SvIq84aNTliYGmjwsW6NqJSnqmI1HLJKc= go.etcd.io/etcd/server/v3 v3.5.7 h1:BTBD8IJUV7YFgsczZMHhMTS67XuA4KpRquL0MFOJGRk= @@ -3049,8 +3052,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -4086,8 +4089,8 @@ gopkg.in/mail.v2 v2.0.0-20180731213649-a0242b2233b4/go.mod h1:htwXN1Qh09vZJ1NVKx gopkg.in/mail.v2 v2.3.1 h1:WYFn/oANrAGP2C0dcV6/pbkPzv8yGzqTjPmTeO7qoXk= gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= -gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.1.9/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= diff --git a/pkg/apis/playlist/v0alpha1/doc.go b/pkg/apis/playlist/v0alpha1/doc.go index 525126b0b7e..9e0c3053c16 100644 --- a/pkg/apis/playlist/v0alpha1/doc.go +++ b/pkg/apis/playlist/v0alpha1/doc.go @@ -1,5 +1,5 @@ // +k8s:deepcopy-gen=package // +k8s:openapi-gen=true -// +groupName=playlist.grafana.io +// +groupName=playlist.grafana.app package v0alpha1 // import "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1" diff --git a/pkg/services/grafana-apiserver/README.md b/pkg/services/grafana-apiserver/README.md index 8beebc853e9..0d40f3a6b3f 100644 --- a/pkg/services/grafana-apiserver/README.md +++ b/pkg/services/grafana-apiserver/README.md @@ -23,13 +23,36 @@ Start `etcd`: make devenv sources=etcd ``` -Add etcd server to `custom.ini`: +Set storage type and etcd server address in `custom.ini`: ```ini [grafana-apiserver] +storage_type = etcd etcd_servers = 127.0.0.1:2379 ``` +## Enable dual write to JSON files: + +Set storage type: + +```ini +[grafana-apiserver] +storage_type = file +``` + +Objects will be written to disk under the `{data.path}/grafana-apiserver/` directory. + +For example: + +``` +data/grafana-apiserver +├── grafana.kubeconfig +└── playlist.grafana.app + └── playlists + └── default + └── hi.json +``` + ### `kubectl` access From the root of the Grafanaa repository, run the following: diff --git a/pkg/services/grafana-apiserver/config.go b/pkg/services/grafana-apiserver/config.go index 75916af55e5..be7eaf1254a 100644 --- a/pkg/services/grafana-apiserver/config.go +++ b/pkg/services/grafana-apiserver/config.go @@ -19,6 +19,8 @@ type config struct { host string apiURL string + storageType StorageType + etcdServers []string dataPath string @@ -52,6 +54,7 @@ func newConfig(cfg *setting.Cfg) *config { host: host, logLevel: cfg.SectionWithEnvOverrides("grafana-apiserver").Key("log_level").MustInt(defaultLogLevel), etcdServers: cfg.SectionWithEnvOverrides("grafana-apiserver").Key("etcd_servers").Strings(","), + storageType: StorageType(cfg.SectionWithEnvOverrides("grafana-apiserver").Key("storage_type").MustString(string(StorageTypeLegacy))), apiURL: apiURL, } } diff --git a/pkg/services/grafana-apiserver/config_test.go b/pkg/services/grafana-apiserver/config_test.go index aa0eb211d25..78c0cf52622 100644 --- a/pkg/services/grafana-apiserver/config_test.go +++ b/pkg/services/grafana-apiserver/config_test.go @@ -27,6 +27,7 @@ func TestNewConfig(t *testing.T) { expected := &config{ enabled: true, devMode: false, + storageType: StorageTypeLegacy, etcdServers: []string{"http://localhost:2379"}, apiURL: "http://test:4000", ip: net.ParseIP("10.0.0.1"), diff --git a/pkg/services/grafana-apiserver/service.go b/pkg/services/grafana-apiserver/service.go index 932b936e01d..3eadba4b15c 100644 --- a/pkg/services/grafana-apiserver/service.go +++ b/pkg/services/grafana-apiserver/service.go @@ -33,10 +33,19 @@ import ( "github.com/grafana/grafana/pkg/modules" "github.com/grafana/grafana/pkg/registry" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" + filestorage "github.com/grafana/grafana/pkg/services/grafana-apiserver/storage/file" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/web" ) +type StorageType string + +const ( + StorageTypeFile StorageType = "file" + StorageTypeEtcd StorageType = "etcd" + StorageTypeLegacy StorageType = "legacy" +) + var ( _ Service = (*service)(nil) _ RestConfigProvider = (*service)(nil) @@ -106,8 +115,8 @@ func ProvideService( rr: rr, stopCh: make(chan struct{}), builders: []APIGroupBuilder{}, - tracing: tracing, authorizer: authz, + tracing: tracing, } // This will be used when running as a dskit service @@ -184,17 +193,9 @@ func (s *service) start(ctx context.Context) error { o.SecureServing.BindPort = s.config.port o.Authentication.RemoteKubeConfigFileOptional = true o.Authorization.RemoteKubeConfigFileOptional = true - o.Etcd.StorageConfig.Transport.ServerList = s.config.etcdServers o.Admission = nil o.CoreAPI = nil - if len(o.Etcd.StorageConfig.Transport.ServerList) == 0 { - o.Etcd = nil - } - - if err := o.Validate(); len(err) > 0 { - return err[0] - } serverConfig := genericapiserver.NewRecommendedConfig(Codecs) serverConfig.ExternalAddress = s.config.host @@ -224,7 +225,11 @@ func (s *service) start(ctx context.Context) error { } } - if o.Etcd != nil { + if s.config.storageType == StorageTypeEtcd { + o.Etcd.StorageConfig.Transport.ServerList = s.config.etcdServers + if err := o.Etcd.Validate(); len(err) > 0 { + return err[0] + } if err := o.Etcd.Complete(serverConfig.Config.StorageObjectCountTracker, serverConfig.Config.DrainedNotify(), serverConfig.Config.AddPostStartHook); err != nil { return err } @@ -233,6 +238,10 @@ func (s *service) start(ctx context.Context) error { } } + if s.config.storageType == StorageTypeFile { + serverConfig.RESTOptionsGetter = filestorage.NewRESTOptionsGetter(s.config.dataPath, o.Etcd.StorageConfig) + } + serverConfig.Authorization.Authorizer = s.authorizer // Add OpenAPI specs for each group+version diff --git a/pkg/services/grafana-apiserver/storage/file/file.go b/pkg/services/grafana-apiserver/storage/file/file.go new file mode 100644 index 00000000000..d04dea3365a --- /dev/null +++ b/pkg/services/grafana-apiserver/storage/file/file.go @@ -0,0 +1,519 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. + +package file + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "reflect" + "strings" + "time" + + "github.com/bwmarrin/snowflake" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + "k8s.io/client-go/tools/cache" +) + +const MaxUpdateAttempts = 30 + +var _ storage.Interface = (*Storage)(nil) + +// Storage implements storage.Interface and storage resources as JSON files on disk. +type Storage struct { + root string + gr schema.GroupResource + codec runtime.Codec + keyFunc func(obj runtime.Object) (string, error) + newFunc func() runtime.Object + newListFunc func() runtime.Object + getAttrsFunc storage.AttrFunc + trigger storage.IndexerFuncs + indexers *cache.Indexers + + watchSet *WatchSet +} + +// ErrFileNotExists means the file doesn't actually exist. +var ErrFileNotExists = fmt.Errorf("file doesn't exist") + +// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist. +var ErrNamespaceNotExists = errors.New("namespace does not exist") + +func getResourceVersion() (*uint64, error) { + node, err := snowflake.NewNode(1) + if err != nil { + return nil, err + } + + snowflakeNumber := node.Generate().Int64() + resourceVersion := uint64(snowflakeNumber) + return &resourceVersion, nil +} + +// NewStorage instantiates a new Storage. +func NewStorage( + config *storagebackend.ConfigForResource, + resourcePrefix string, + keyFunc func(obj runtime.Object) (string, error), + newFunc func() runtime.Object, + newListFunc func() runtime.Object, + getAttrsFunc storage.AttrFunc, + trigger storage.IndexerFuncs, + indexers *cache.Indexers, +) (storage.Interface, factory.DestroyFunc, error) { + if err := ensureDir(resourcePrefix); err != nil { + return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", resourcePrefix) + } + ws := NewWatchSet() + return &Storage{ + root: resourcePrefix, + gr: config.GroupResource, + codec: config.Codec, + keyFunc: keyFunc, + newFunc: newFunc, + newListFunc: newListFunc, + getAttrsFunc: getAttrsFunc, + trigger: trigger, + indexers: indexers, + + watchSet: ws, + }, func() { + ws.cleanupWatchers() + }, nil +} + +// Returns Versioner associated with this storage. +func (s *Storage) Versioner() storage.Versioner { + return &storage.APIObjectVersioner{} +} + +// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live +// in seconds (0 means forever). If no error is returned and out is not nil, out will be +// set to the read value from database. +func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error { + filename := s.filePath(key) + if exists(filename) { + return storage.NewKeyExistsError(key, 0) + } + + dirname := filepath.Dir(filename) + if err := ensureDir(dirname); err != nil { + return err + } + + generatedRV, err := getResourceVersion() + if err != nil { + return err + } + + metaObj, err := meta.Accessor(obj) + if err != nil { + return err + } + metaObj.SetSelfLink("") + + if err := s.Versioner().UpdateObject(obj, *generatedRV); err != nil { + return err + } + + if err := writeFile(s.codec, filename, obj); err != nil { + return err + } + + // set a timer to delete the file after ttl seconds + if ttl > 0 { + time.AfterFunc(time.Second*time.Duration(ttl), func() { + if err := s.Delete(ctx, key, s.newFunc(), &storage.Preconditions{}, func(ctx context.Context, obj runtime.Object) error { return nil }, obj); err != nil { + panic(err) + } + }) + } + + if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil { + return err + } + + s.watchSet.notifyWatchers(watch.Event{ + Object: out.DeepCopyObject(), + Type: watch.Added, + }) + + return nil +} + +// Delete removes the specified key and returns the value that existed at that spot. +// If key didn't exist, it will return NotFound storage error. +// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the +// current version of the object to avoid read operation from storage to get it. +// However, the implementations have to retry in case suggestion is stale. +func (s *Storage) Delete( + ctx context.Context, + key string, + out runtime.Object, + preconditions *storage.Preconditions, + validateDeletion storage.ValidateObjectFunc, + cachedExistingObject runtime.Object, +) error { + filename := s.filePath(key) + var currentState runtime.Object + var stateIsCurrent bool + if cachedExistingObject != nil { + currentState = cachedExistingObject + } else { + getOptions := storage.GetOptions{} + if preconditions != nil && preconditions.ResourceVersion != nil { + getOptions.ResourceVersion = *preconditions.ResourceVersion + } + if err := s.Get(ctx, key, getOptions, currentState); err == nil { + stateIsCurrent = true + } + } + + for { + if preconditions != nil { + if err := preconditions.Check(key, out); err != nil { + if stateIsCurrent { + return err + } + + // If the state is not current, we need to re-read the state and try again. + if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err != nil { + return err + } + stateIsCurrent = true + continue + } + } + + if err := validateDeletion(ctx, out); err != nil { + if stateIsCurrent { + return err + } + + // If the state is not current, we need to re-read the state and try again. + if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err == nil { + stateIsCurrent = true + } + continue + } + + if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil { + return err + } + + generatedRV, err := getResourceVersion() + if err != nil { + return err + } + if err := s.Versioner().UpdateObject(out, *generatedRV); err != nil { + return err + } + + if err := deleteFile(filename); err != nil { + return err + } + + s.watchSet.notifyWatchers(watch.Event{ + Object: out.DeepCopyObject(), + Type: watch.Deleted, + }) + + return nil + } +} + +// Watch begins watching the specified key. Events are decoded into API objects, +// and any items selected by 'p' are sent down to returned watch.Interface. +// resourceVersion may be used to specify what version to begin watching, +// which should be the current resourceVersion, and no longer rv+1 +// (e.g. reconnecting without missing any updates). +// If resource version is "0", this interface will get current object at given key +// and send it in an "ADDED" event, before watch starts. +func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + p := opts.Predicate + jw := s.watchSet.newWatch() + + listObj := s.newListFunc() + + if opts.ResourceVersion == "0" { + err := s.GetList(ctx, key, opts, listObj) + if err != nil { + return nil, err + } + } + + initEvents := make([]watch.Event, 0) + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return nil, err + } + v, err := conversion.EnforcePtr(listPtr) + if err != nil { + return nil, err + } + + if v.IsNil() { + jw.Start(p, initEvents) + return jw, nil + } + + for _, obj := range v.Elem().Interface().([]runtime.Object) { + initEvents = append(initEvents, watch.Event{ + Type: watch.Added, + Object: obj.DeepCopyObject(), + }) + } + jw.Start(p, initEvents) + return jw, nil +} + +// Get unmarshals object found at key into objPtr. On a not found error, will either +// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. +// Treats empty responses and nil response nodes exactly like a not found error. +// The returned contents may be delayed, but it is guaranteed that they will +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. +func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + filename := s.filePath(key) + if !exists(filename) { + if opts.IgnoreNotFound { + return runtime.SetZeroValue(objPtr) + } + rv, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion) + if err != nil { + return err + } + return storage.NewKeyNotFoundError(key, int64(rv)) + } + + obj, err := readFile(s.codec, filename, func() runtime.Object { + return objPtr + }) + if err != nil { + return err + } + + currentVersion, err := s.Versioner().ObjectResourceVersion(obj) + if err != nil { + return err + } + + if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil { + return err + } + + return nil +} + +// GetList unmarshalls objects found at key into a *List api object (an object +// that satisfies runtime.IsList definition). +// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive' +// is true, 'key' is used as a prefix. +// The returned contents may be delayed, but it is guaranteed that they will +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. +func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + generatedRV, err := getResourceVersion() + if err != nil { + return err + } + remainingItems := int64(0) + if err := s.Versioner().UpdateList(listObj, *generatedRV, "", &remainingItems); err != nil { + return err + } + + if opts.ResourceVersion != "" { + resourceVersionInt, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion) + if err != nil { + return err + } + if err := s.Versioner().UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil { + return err + } + } + + objs, err := readDirRecursive(s.codec, key, s.newFunc) + if err != nil { + return err + } + + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + v, err := conversion.EnforcePtr(listPtr) + if err != nil { + return err + } + + for _, obj := range objs { + currentVersion, err := s.Versioner().ObjectResourceVersion(obj) + if err != nil { + return err + } + + if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil { + continue + } + + ok, err := opts.Predicate.Matches(obj) + if err == nil && ok { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) + } + } + + return nil +} + +// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination') +// retrying the update until success if there is index conflict. +// Note that object passed to tryUpdate may change across invocations of tryUpdate() if +// other writers are simultaneously updating it, so tryUpdate() needs to take into account +// the current contents of the object when deciding how the update object should look. +// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false +// else `destination` will be set to the zero value of it's type. +// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized +// contents as the input, it won't perform any update, but instead set `destination` to an object with those +// contents. +// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the +// current version of the object to avoid read operation from storage to get it. +// However, the implementations have to retry in case suggestion is stale. +func (s *Storage) GuaranteedUpdate( + ctx context.Context, + key string, + destination runtime.Object, + ignoreNotFound bool, + preconditions *storage.Preconditions, + tryUpdate storage.UpdateFunc, + cachedExistingObject runtime.Object, +) error { + var res storage.ResponseMeta + for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 { + var ( + filename = s.filePath(key) + + obj runtime.Object + err error + created bool + ) + + if !exists(filename) && !ignoreNotFound { + return apierrors.NewNotFound(s.gr, s.nameFromKey(key)) + } + + obj, err = readFile(s.codec, filename, s.newFunc) + if err != nil { + // fallback to new object if the file is not found + obj = s.newFunc() + created = true + } + + if err := preconditions.Check(key, obj); err != nil { + if attempt >= MaxUpdateAttempts { + return fmt.Errorf("precondition failed: %w", err) + } + continue + } + + updatedObj, _, err := tryUpdate(obj, res) + if err != nil { + if attempt >= MaxUpdateAttempts { + return err + } + continue + } + + unchanged, err := isUnchanged(s.codec, obj, updatedObj) + if err != nil { + return err + } + + if unchanged { + u, err := conversion.EnforcePtr(updatedObj) + if err != nil { + return fmt.Errorf("unable to enforce updated object pointer: %w", err) + } + d, err := conversion.EnforcePtr(destination) + if err != nil { + return fmt.Errorf("unable to enforce destination pointer: %w", err) + } + d.Set(u) + return nil + } + + generatedRV, err := getResourceVersion() + if err != nil { + return err + } + if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil { + return err + } + if err := writeFile(s.codec, filename, updatedObj); err != nil { + return err + } + eventType := watch.Modified + if created { + eventType = watch.Added + } + s.watchSet.notifyWatchers(watch.Event{ + Object: updatedObj.DeepCopyObject(), + Type: eventType, + }) + } + return nil +} + +// Count returns number of different entries under the key (generally being path prefix). +func (s *Storage) Count(key string) (int64, error) { + return 0, nil +} + +// RequestWatchProgress requests the a watch stream progress status be sent in the +// watch response stream as soon as possible. +// Used for monitor watch progress even if watching resources with no changes. +// +// If watch is lagging, progress status might: +// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version. +// * not be delivered at all. It's recommended to poll request progress periodically. +// +// Note: Only watches with matching context grpc metadata will be notified. +// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042 +// +// TODO: Remove when storage.Interface will be separate from etc3.store. +// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache. +func (s *Storage) RequestWatchProgress(ctx context.Context) error { + return nil +} + +// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is +// greater than the most recent actualRevision available from storage. +func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { + if minimumResourceVersion == "" { + return nil + } + minimumRV, err := s.Versioner().ParseResourceVersion(minimumResourceVersion) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + // Enforce the storage.Interface guarantee that the resource version of the returned data + // "will be at least 'resourceVersion'". + if minimumRV > actualRevision { + return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0) + } + return nil +} + +func (s *Storage) nameFromKey(key string) string { + return strings.Replace(key, s.root+"/", "", 1) +} diff --git a/pkg/services/grafana-apiserver/storage/file/restoptions.go b/pkg/services/grafana-apiserver/storage/file/restoptions.go new file mode 100644 index 00000000000..3a4352623f9 --- /dev/null +++ b/pkg/services/grafana-apiserver/storage/file/restoptions.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package file + +import ( + "path" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage/storagebackend" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" +) + +var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) + +type RESTOptionsGetter struct { + path string + original storagebackend.Config +} + +func NewRESTOptionsGetter(path string, originalStorageConfig storagebackend.Config) *RESTOptionsGetter { + if path == "" { + path = "/tmp/grafana-apiserver" + } + + return &RESTOptionsGetter{path: path, original: originalStorageConfig} +} + +func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { + storageConfig := &storagebackend.ConfigForResource{ + Config: storagebackend.Config{ + Type: "file", + Prefix: r.path, + Transport: storagebackend.TransportConfig{}, + Paging: false, + Codec: r.original.Codec, + EncodeVersioner: r.original.EncodeVersioner, + Transformer: r.original.Transformer, + CompactionInterval: 0, + CountMetricPollPeriod: 0, + DBMetricPollInterval: 0, + HealthcheckTimeout: 0, + ReadycheckTimeout: 0, + StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), + }, + GroupResource: resource, + } + + ret := generic.RESTOptions{ + StorageConfig: storageConfig, + Decorator: NewStorage, + DeleteCollectionWorkers: 0, + EnableGarbageCollection: false, + ResourcePrefix: path.Join(storageConfig.Prefix, resource.Group, resource.Resource), + CountMetricPollPeriod: 1 * time.Second, + StorageObjectCountTracker: storageConfig.Config.StorageObjectCountTracker, + } + + return ret, nil +} diff --git a/pkg/services/grafana-apiserver/storage/file/util.go b/pkg/services/grafana-apiserver/storage/file/util.go new file mode 100644 index 00000000000..137138e782a --- /dev/null +++ b/pkg/services/grafana-apiserver/storage/file/util.go @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. + +package file + +import ( + "bytes" + "os" + "path/filepath" + + "k8s.io/apimachinery/pkg/runtime" +) + +func (s *Storage) filePath(key string) string { + return key + ".json" +} + +func writeFile(codec runtime.Codec, path string, obj runtime.Object) error { + buf := new(bytes.Buffer) + if err := codec.Encode(obj, buf); err != nil { + return err + } + return os.WriteFile(path, buf.Bytes(), 0600) +} + +func readFile(codec runtime.Codec, path string, newFunc func() runtime.Object) (runtime.Object, error) { + content, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return nil, err + } + newObj := newFunc() + decodedObj, _, err := codec.Decode(content, nil, newObj) + if err != nil { + return nil, err + } + return decodedObj, nil +} + +func readDirRecursive(codec runtime.Codec, path string, newFunc func() runtime.Object) ([]runtime.Object, error) { + var objs []runtime.Object + err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() || filepath.Ext(path) != ".json" { + return nil + } + obj, err := readFile(codec, path, newFunc) + if err != nil { + return err + } + objs = append(objs, obj) + return nil + }) + if err != nil { + return nil, err + } + return objs, nil +} + +func deleteFile(path string) error { + return os.Remove(path) +} + +func exists(filepath string) bool { + _, err := os.Stat(filepath) + return err == nil +} + +func ensureDir(dirname string) error { + if !exists(dirname) { + return os.MkdirAll(dirname, 0700) + } + return nil +} + +func isUnchanged(codec runtime.Codec, obj runtime.Object, newObj runtime.Object) (bool, error) { + buf := new(bytes.Buffer) + if err := codec.Encode(obj, buf); err != nil { + return false, err + } + + newBuf := new(bytes.Buffer) + if err := codec.Encode(newObj, newBuf); err != nil { + return false, err + } + + return bytes.Equal(buf.Bytes(), newBuf.Bytes()), nil +} diff --git a/pkg/services/grafana-apiserver/storage/file/watchset.go b/pkg/services/grafana-apiserver/storage/file/watchset.go new file mode 100644 index 00000000000..b6569f125d6 --- /dev/null +++ b/pkg/services/grafana-apiserver/storage/file/watchset.go @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/tilt-dev/tilt-apiserver/blob/main/pkg/storage/filepath/watchset.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. + +package file + +import ( + "sync" + + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" +) + +// Keeps track of which watches need to be notified +type WatchSet struct { + mu sync.RWMutex + nodes map[int]*watchNode + counter int +} + +func NewWatchSet() *WatchSet { + return &WatchSet{ + nodes: make(map[int]*watchNode, 20), + counter: 0, + } +} + +// Creates a new watch with a unique id, but +// does not start sending events to it until start() is called. +func (s *WatchSet) newWatch() *watchNode { + s.mu.Lock() + defer s.mu.Unlock() + + s.counter++ + + return &watchNode{ + id: s.counter, + s: s, + updateCh: make(chan watch.Event), + outCh: make(chan watch.Event), + } +} + +func (s *WatchSet) cleanupWatchers() { + // Doesn't protect the below access on nodes slice since it won't ever be modified during cleanup + for _, w := range s.nodes { + w.Stop() + } +} + +func (s *WatchSet) notifyWatchers(ev watch.Event) { + s.mu.RLock() + for _, w := range s.nodes { + w.updateCh <- ev + } + s.mu.RUnlock() +} + +type watchNode struct { + s *WatchSet + id int + updateCh chan watch.Event + outCh chan watch.Event +} + +// Start sending events to this watch. +func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) { + w.s.mu.Lock() + w.s.nodes[w.id] = w + w.s.mu.Unlock() + + go func() { + for _, e := range initEvents { + w.outCh <- e + } + + for e := range w.updateCh { + ok, err := p.Matches(e.Object) + if err != nil { + continue + } + + if !ok { + continue + } + w.outCh <- e + } + close(w.outCh) + }() +} + +func (w *watchNode) Stop() { + w.s.mu.Lock() + delete(w.s.nodes, w.id) + w.s.mu.Unlock() + + close(w.updateCh) +} + +func (w *watchNode) ResultChan() <-chan watch.Event { + return w.outCh +}