Storage Api: Adds traces (#85391)

- adds traces and improved logging to the unified storage server
- add a configurable logger to the gRPC server service
This commit is contained in:
owensmallwood
2024-04-16 08:30:51 -06:00
committed by GitHub
parent a12669951b
commit 8c8885ef23
10 changed files with 262 additions and 67 deletions

View File

@@ -104,6 +104,8 @@ address = "127.0.0.1:10000"
use_tls = false
cert_file =
key_file =
# this will log the request and response for each unary gRPC call
enable_logging = false
#################################### Database ############################
[database]
@@ -1837,4 +1839,4 @@ fetch_access_policy_timeout = 5s
# How long to wait for a request to create to delete an access policy to complete
delete_access_policy_timeout = 5s
# The domain name used to access cms
domain = grafana-dev.net
domain = grafana-dev.net

View File

@@ -251,7 +251,7 @@ func (s *service) start(ctx context.Context) error {
return err
}
storeServer, err := sqlstash.ProvideSQLEntityServer(eDB)
storeServer, err := sqlstash.ProvideSQLEntityServer(eDB, s.tracing)
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"context"
"testing"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/apitesting"
@@ -71,7 +72,11 @@ func createTestContext(t *testing.T) (entityStore.EntityStoreClient, factory.Des
err = eDB.Init()
require.NoError(t, err)
store, err := sqlstash.ProvideSQLEntityServer(eDB)
traceConfig, err := tracing.ParseTracingConfig(cfg)
require.NoError(t, err)
tracer, err := tracing.ProvideService(traceConfig)
require.NoError(t, err)
store, err := sqlstash.ProvideSQLEntityServer(eDB, tracer)
require.NoError(t, err)
client := entityStore.NewEntityStoreClientLocal(store)

View File

@@ -0,0 +1,29 @@
package interceptors
import (
"context"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
"google.golang.org/grpc"
)
func LoggingUnaryInterceptor(cfg *setting.Cfg, logger log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp any, err error) {
resp, err = handler(ctx, req)
if cfg.GRPCServerEnableLogging {
ctxLogger := logger.FromContext(ctx)
if err != nil {
ctxLogger.Error("gRPC call", "method", info.FullMethod, "req", req, "err", err)
} else {
ctxLogger.Info("gRPC call", "method", info.FullMethod, "req", req, "resp", resp)
}
}
return resp, err
}
}

View File

@@ -73,6 +73,7 @@ func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles, authe
grpc_middleware.ChainUnaryServer(
grpcAuth.UnaryServerInterceptor(authenticator.Authenticate),
interceptors.TracingUnaryInterceptor(tracer),
interceptors.LoggingUnaryInterceptor(s.cfg, s.logger), // needs to be registered after tracing interceptor to get trace id
middleware.UnaryServerInstrumentInterceptor(grpcRequestDuration),
),
),

View File

@@ -64,6 +64,7 @@ func ProvideService(
if err != nil {
return nil, err
}
tracingCfg.ServiceName = "unified-storage"
tracing, err := tracing.ProvideService(tracingCfg)
if err != nil {
@@ -114,7 +115,7 @@ func (s *service) start(ctx context.Context) error {
return err
}
store, err := sqlstash.ProvideSQLEntityServer(eDB)
store, err := sqlstash.ProvideSQLEntityServer(eDB, s.tracing)
if err != nil {
return err
}

View File

@@ -20,22 +20,30 @@ import (
folder "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const entityTable = "entity"
const entityHistoryTable = "entity_history"
var (
errorUserNotFoundInContext = errors.New("can not find user in context")
errorNextPageTokenNotSupported = errors.New("nextPageToken not yet supported")
errorEntityAlreadyExists = errors.New("entity already exists")
)
// Make sure we implement correct interfaces
var _ entity.EntityStoreServer = &sqlEntityServer{}
var _ SqlEntityServer = &sqlEntityServer{}
func ProvideSQLEntityServer(db db.EntityDBInterface /*, cfg *setting.Cfg */) (SqlEntityServer, error) {
func ProvideSQLEntityServer(db db.EntityDBInterface, tracer tracing.Tracer /*, cfg *setting.Cfg */) (SqlEntityServer, error) {
ctx, cancel := context.WithCancel(context.Background())
entityServer := &sqlEntityServer{
@@ -43,6 +51,7 @@ func ProvideSQLEntityServer(db db.EntityDBInterface /*, cfg *setting.Cfg */) (Sq
log: log.New("sql-entity-server"),
ctx: ctx,
cancel: cancel,
tracer: tracer,
}
if err := prometheus.Register(NewStorageMetrics()); err != nil {
@@ -69,6 +78,7 @@ type sqlEntityServer struct {
ctx context.Context
cancel context.CancelFunc
stream chan *entity.EntityWatchResponse
tracer tracing.Tracer
}
func (s *sqlEntityServer) Init() error {
@@ -224,14 +234,26 @@ func readEntity(rows *sql.Rows, r FieldSelectRequest) (*entity.Entity, error) {
}
func (s *sqlEntityServer) Read(ctx context.Context, r *entity.ReadEntityRequest) (*entity.Entity, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Read")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "read"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
return s.read(ctx, s.sess, r)
res, err := s.read(ctx, s.sess, r)
if err != nil {
ctxLogger.Error("read error", "error", err)
}
return res, err
}
func (s *sqlEntityServer) read(ctx context.Context, tx session.SessionQuerier, r *entity.ReadEntityRequest) (*entity.Entity, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.read")
defer span.End()
table := entityTable
where := []string{}
args := []any{}
@@ -288,7 +310,12 @@ func (s *sqlEntityServer) read(ctx context.Context, tx session.SessionQuerier, r
//nolint:gocyclo
func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequest) (*entity.CreateEntityResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "create"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
@@ -301,10 +328,12 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
if createdBy == "" {
modifier, err := appcontext.User(ctx)
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return nil, err
}
if modifier == nil {
return nil, fmt.Errorf("can not find user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return nil, err
}
createdBy = store.GetUserIDString(modifier)
}
@@ -329,7 +358,8 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
// if we found an existing entity
if current.Guid != "" {
return fmt.Errorf("entity already exists")
ctxLogger.Error("entity already exists", "error", errorEntityAlreadyExists)
return errorEntityAlreadyExists
}
// generate guid for new entity
@@ -342,6 +372,7 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
// parse provided key
key, err := entity.ParseKey(r.Entity.Key)
if err != nil {
ctxLogger.Error("error parsing key", "error", err)
return err
}
@@ -387,21 +418,21 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
labels, err := json.Marshal(r.Entity.Labels)
if err != nil {
s.log.Error("error marshalling labels", "msg", err.Error())
ctxLogger.Error("error marshalling labels", "msg", err.Error())
return err
}
current.Labels = r.Entity.Labels
fields, err := json.Marshal(r.Entity.Fields)
if err != nil {
s.log.Error("error marshalling fields", "msg", err.Error())
ctxLogger.Error("error marshalling fields", "msg", err.Error())
return err
}
current.Fields = r.Entity.Fields
errors, err := json.Marshal(r.Entity.Errors)
if err != nil {
s.log.Error("error marshalling errors", "msg", err.Error())
ctxLogger.Error("error marshalling errors", "msg", err.Error())
return err
}
current.Errors = r.Entity.Errors
@@ -465,14 +496,14 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
}
// 1. Add row to the `entity_history` values
if err := s.dialect.Insert(ctx, tx, entityHistoryTable, values); err != nil {
s.log.Error("error inserting entity history", "msg", err.Error())
if err = s.insert(ctx, tx, entityHistoryTable, values); err != nil {
ctxLogger.Error("insert entity_history error", "error", err)
return err
}
// 2. Add row to the main `entity` table
if err := s.dialect.Insert(ctx, tx, entityTable, values); err != nil {
s.log.Error("error inserting entity", "msg", err.Error())
if err = s.insert(ctx, tx, entityTable, values); err != nil {
ctxLogger.Error("insert entity error", "error", err)
return err
}
@@ -482,7 +513,7 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
case folder.RESOURCE:
err = s.updateFolderTree(ctx, tx, current.Namespace)
if err != nil {
s.log.Error("error updating folder tree", "msg", err.Error())
ctxLogger.Error("error updating folder tree", "error", err.Error())
return err
}
}
@@ -493,7 +524,7 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
return s.setLabels(ctx, tx, current.Guid, current.Labels)
})
if err != nil {
s.log.Error("error creating entity", "msg", err.Error())
ctxLogger.Error("error creating entity", "msg", err.Error())
rsp.Status = entity.CreateEntityResponse_ERROR
}
@@ -508,7 +539,12 @@ func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequ
//nolint:gocyclo
func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequest) (*entity.UpdateEntityResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "update"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
@@ -521,10 +557,12 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
if updatedBy == "" {
modifier, err := appcontext.User(ctx)
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return nil, err
}
if modifier == nil {
return nil, fmt.Errorf("can not find user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return nil, errorUserNotFoundInContext
}
updatedBy = store.GetUserIDString(modifier)
}
@@ -606,21 +644,21 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
labels, err := json.Marshal(r.Entity.Labels)
if err != nil {
s.log.Error("error marshalling labels", "msg", err.Error())
ctxLogger.Error("error marshalling labels", "msg", err.Error())
return err
}
updated.Labels = r.Entity.Labels
fields, err := json.Marshal(r.Entity.Fields)
if err != nil {
s.log.Error("error marshalling fields", "msg", err.Error())
ctxLogger.Error("error marshalling fields", "msg", err.Error())
return err
}
updated.Fields = r.Entity.Fields
errors, err := json.Marshal(r.Entity.Errors)
if err != nil {
s.log.Error("error marshalling errors", "msg", err.Error())
ctxLogger.Error("error marshalling errors", "msg", err.Error())
return err
}
updated.Errors = r.Entity.Errors
@@ -686,8 +724,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
}
// 1. Add the `entity_history` values
if err := s.dialect.Insert(ctx, tx, entityHistoryTable, values); err != nil {
s.log.Error("error inserting entity history", "msg", err.Error())
if err := s.insert(ctx, tx, entityHistoryTable, values); err != nil {
return err
}
@@ -703,7 +740,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
delete(values, "created_at")
delete(values, "created_by")
err = s.dialect.Update(
err = s.update(
ctx,
tx,
entityTable,
@@ -713,7 +750,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
},
)
if err != nil {
s.log.Error("error updating entity", "msg", err.Error())
ctxLogger.Error("error updating entity", "error", err.Error())
return err
}
@@ -723,7 +760,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
case folder.RESOURCE:
err = s.updateFolderTree(ctx, tx, updated.Namespace)
if err != nil {
s.log.Error("error updating folder tree", "msg", err.Error())
ctxLogger.Error("error updating folder tree", "msg", err.Error())
return err
}
}
@@ -734,7 +771,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
return s.setLabels(ctx, tx, updated.Guid, updated.Labels)
})
if err != nil {
s.log.Error("error updating entity", "msg", err.Error())
ctxLogger.Error("error updating entity", "msg", err.Error())
rsp.Status = entity.UpdateEntityResponse_ERROR
}
@@ -750,6 +787,9 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ
}
func (s *sqlEntityServer) setLabels(ctx context.Context, tx *session.SessionTx, guid string, labels map[string]string) error {
ctx, span := s.tracer.Start(ctx, "storage_server.setLabels")
defer span.End()
s.log.Debug("setLabels", "guid", guid, "labels", labels)
// Clear the old labels
@@ -781,7 +821,12 @@ func (s *sqlEntityServer) setLabels(ctx context.Context, tx *session.SessionTx,
}
func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequest) (*entity.DeleteEntityResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "delete"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
@@ -826,6 +871,9 @@ func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequ
rsp.Status = entity.DeleteEntityResponse_DELETED
return nil
})
if err != nil {
ctxLogger.Error("delete error", "error", err)
}
if rsp.Status == entity.DeleteEntityResponse_DELETED {
// k8s expects us to return the entity as it was before the deletion, but with the updated RV
@@ -846,6 +894,10 @@ func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequ
}
func (s *sqlEntityServer) doDelete(ctx context.Context, tx *session.SessionTx, ent *entity.Entity) (*entity.Entity, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.doDelete")
defer span.End()
ctxLogger := s.log.FromContext(ctx)
updated := proto.Clone(ent).(*entity.Entity)
// Update resource version
@@ -864,19 +916,19 @@ func (s *sqlEntityServer) doDelete(ctx context.Context, tx *session.SessionTx, e
labels, err := json.Marshal(updated.Labels)
if err != nil {
s.log.Error("error marshalling labels", "msg", err.Error())
ctxLogger.Error("error marshalling labels", "msg", err.Error())
return nil, err
}
fields, err := json.Marshal(updated.Fields)
if err != nil {
s.log.Error("error marshalling fields", "msg", err.Error())
ctxLogger.Error("error marshalling fields", "msg", err.Error())
return nil, err
}
errors, err := json.Marshal(updated.Errors)
if err != nil {
s.log.Error("error marshalling errors", "msg", err.Error())
ctxLogger.Error("error marshalling errors", "msg", err.Error())
return nil, err
}
@@ -922,21 +974,17 @@ func (s *sqlEntityServer) doDelete(ctx context.Context, tx *session.SessionTx, e
}
// 1. Add the `entity_history` values
if err := s.dialect.Insert(ctx, tx, entityHistoryTable, values); err != nil {
s.log.Error("error inserting entity history", "msg", err.Error())
if err := s.insert(ctx, tx, entityHistoryTable, values); err != nil {
return nil, err
}
_, err = tx.Exec(ctx, "DELETE FROM entity WHERE guid=?", updated.Guid)
if err != nil {
if err = s.exec(ctx, tx, "DELETE FROM entity WHERE guid=?", updated.Guid); err != nil {
return nil, err
}
_, err = tx.Exec(ctx, "DELETE FROM entity_labels WHERE guid=?", updated.Guid)
if err != nil {
if err = s.exec(ctx, tx, "DELETE FROM entity_labels WHERE guid=?", updated.Guid); err != nil {
return nil, err
}
_, err = tx.Exec(ctx, "DELETE FROM entity_ref WHERE guid=?", updated.Guid)
if err != nil {
if err = s.exec(ctx, tx, "DELETE FROM entity_ref WHERE guid=?", updated.Guid); err != nil {
return nil, err
}
@@ -956,22 +1004,36 @@ func (s *sqlEntityServer) doDelete(ctx context.Context, tx *session.SessionTx, e
}
func (s *sqlEntityServer) History(ctx context.Context, r *entity.EntityHistoryRequest) (*entity.EntityHistoryResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "history"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
user, err := appcontext.User(ctx)
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return nil, err
}
if user == nil {
return nil, fmt.Errorf("missing user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return nil, errorUserNotFoundInContext
}
return s.history(ctx, r)
res, err := s.history(ctx, r)
if err != nil {
ctxLogger.Error("history error", "error", err)
}
return res, err
}
func (s *sqlEntityServer) history(ctx context.Context, r *entity.EntityHistoryRequest) (*entity.EntityHistoryResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.history")
defer span.End()
var limit int64 = 100
if r.Limit > 0 && r.Limit < 100 {
limit = r.Limit
@@ -1039,7 +1101,7 @@ func (s *sqlEntityServer) history(ctx context.Context, r *entity.EntityHistoryRe
s.log.Debug("history", "query", query, "args", args)
rows, err := s.sess.Query(ctx, query, args...)
rows, err := s.query(ctx, query, args...)
if err != nil {
return nil, err
}
@@ -1148,16 +1210,23 @@ func ParseSortBy(sort string) (*SortBy, error) {
//nolint:gocyclo
func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest) (*entity.EntityListResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "list"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
user, err := appcontext.User(ctx)
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return nil, err
}
if user == nil {
return nil, fmt.Errorf("missing user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return nil, errorUserNotFoundInContext
}
fields := s.getReadFields(r)
@@ -1242,15 +1311,17 @@ func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest)
err = s.sess.Get(ctx, rvMaxRow, query, args...)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
ctxLogger.Error("error running rvMaxQuery", "error", err)
return nil, err
}
}
s.log.Debug("getting max rv", "maxRv", rvMaxRow.Rv, "cnt", rvMaxRow.Cnt, "query", query, "args", args)
ctxLogger.Debug("getting max rv", "maxRv", rvMaxRow.Rv, "cnt", rvMaxRow.Cnt, "query", query, "args", args)
// if we have a page token, use that to specify the first record
continueToken, err := GetContinueToken(r)
if err != nil {
ctxLogger.Error("error getting continue token", "error", err)
return nil, err
}
if continueToken != nil {
@@ -1333,16 +1404,18 @@ func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest)
query, args = entityQuery.ToQuery()
s.log.Debug("listing", "query", query, "args", args)
ctxLogger.Debug("listing", "query", query, "args", args)
rows, err := s.sess.Query(ctx, query, args...)
rows, err := s.query(ctx, query, args...)
if err != nil {
ctxLogger.Error("error running list query", "error", err)
return nil, err
}
defer func() { _ = rows.Close() }()
for rows.Next() {
result, err := readEntity(rows, r)
if err != nil {
ctxLogger.Error("error reading rows to entity", "error", err)
return rsp, err
}
@@ -1355,33 +1428,40 @@ func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest)
rsp.Results = append(rsp.Results, result)
}
span.AddEvent("processed rows", trace.WithAttributes(attribute.Int("row_count", len(rsp.Results))))
return rsp, err
}
func (s *sqlEntityServer) Watch(w entity.EntityStore_WatchServer) error {
ctxLogger := s.log.FromContext(log.WithContextualAttributes(w.Context(), []any{"method", "watch"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return err
}
user, err := appcontext.User(w.Context())
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return err
}
if user == nil {
return fmt.Errorf("missing user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return errorUserNotFoundInContext
}
r, err := w.Recv()
if err != nil {
ctxLogger.Error("recv error", "error", err)
return err
}
// collect and send any historical events
if r.SendInitialEvents {
r.Since, err = s.watchInit(r, w)
r.Since, err = s.watchInit(w.Context(), r, w)
if err != nil {
s.log.Error("watch init error", "err", err)
ctxLogger.Error("watch init error", "err", err)
return err
}
} else if r.Since == 0 {
@@ -1391,7 +1471,7 @@ func (s *sqlEntityServer) Watch(w entity.EntityStore_WatchServer) error {
// subscribe to new events
err = s.watch(r, w)
if err != nil {
s.log.Error("watch error", "err", err)
ctxLogger.Error("watch error", "err", err)
return err
}
@@ -1399,7 +1479,11 @@ func (s *sqlEntityServer) Watch(w entity.EntityStore_WatchServer) error {
}
// watchInit is a helper function to send the initial set of entities to the client
func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.EntityStore_WatchServer) (int64, error) {
func (s *sqlEntityServer) watchInit(ctx context.Context, r *entity.EntityWatchRequest, w entity.EntityStore_WatchServer) (int64, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.watchInit")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "watchInit"}))
lastRv := r.Since
fields := s.getReadFields(r)
@@ -1426,6 +1510,7 @@ func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.Entit
for _, k := range r.Key {
key, err := entity.ParseKey(k)
if err != nil {
ctxLogger.Error("error parsing key", "error", err, "key", k)
return lastRv, err
}
@@ -1483,9 +1568,9 @@ func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.Entit
err = func() error {
query, args := entityQuery.ToQuery()
s.log.Debug("watch init", "query", query, "args", args)
ctxLogger.Debug("watch init", "query", query, "args", args)
rows, err := s.sess.Query(w.Context(), query, args...)
rows, err := s.query(ctx, query, args...)
if err != nil {
return err
}
@@ -1514,7 +1599,7 @@ func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.Entit
Entity: result,
}
s.log.Debug("sending init event", "guid", result.Guid, "action", result.Action, "rv", result.ResourceVersion)
ctxLogger.Debug("sending init event", "guid", result.Guid, "action", result.Action, "rv", result.ResourceVersion)
err = w.Send(resp)
if err != nil {
@@ -1526,6 +1611,7 @@ func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.Entit
return nil
}()
if err != nil {
ctxLogger.Error("watchInit error", "error", err)
return lastRv, err
}
}
@@ -1541,6 +1627,7 @@ func (s *sqlEntityServer) watchInit(r *entity.EntityWatchRequest, w entity.Entit
}
err = w.Send(resp)
if err != nil {
ctxLogger.Error("error sending bookmark event", "error", err)
return lastRv, err
}
}
@@ -1572,6 +1659,10 @@ func (s *sqlEntityServer) poller(stream chan *entity.EntityWatchResponse) {
}
func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse) (int64, error) {
ctx, span := s.tracer.Start(s.ctx, "storage_server.poll")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "poll"}))
rr := &entity.ReadEntityRequest{
WithBody: true,
WithStatus: true,
@@ -1595,7 +1686,7 @@ func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse
query, args := entityQuery.ToQuery()
rows, err := s.sess.Query(s.ctx, query, args...)
rows, err := s.query(ctx, query, args...)
if err != nil {
return err
}
@@ -1604,7 +1695,7 @@ func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse
found := int64(0)
for rows.Next() {
// check if the context is done
if s.ctx.Err() != nil {
if ctx.Err() != nil {
hasmore = false
return nil
}
@@ -1616,6 +1707,7 @@ func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse
updated, err := readEntity(rows, rr)
if err != nil {
ctxLogger.Error("poll error readEntity", "error", err)
return err
}
@@ -1637,16 +1729,16 @@ func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse
WithBody: rr.WithBody,
WithStatus: rr.WithStatus,
}
history, err := s.history(s.ctx, rr)
history, err := s.history(ctx, rr)
if err != nil {
s.log.Error("error reading previous entity", "guid", updated.Guid, "err", err)
ctxLogger.Error("error reading previous entity", "guid", updated.Guid, "err", err)
return err
}
result.Previous = history.Versions[0]
}
s.log.Debug("sending poll result", "guid", updated.Guid, "action", updated.Action, "rv", updated.ResourceVersion)
ctxLogger.Debug("sending poll result", "guid", updated.Guid, "action", updated.Action, "rv", updated.ResourceVersion)
out <- result
}
@@ -1654,6 +1746,7 @@ func (s *sqlEntityServer) poll(since int64, out chan *entity.EntityWatchResponse
return nil
}()
if err != nil {
ctxLogger.Error("poll error", "error", err)
return since, err
}
}
@@ -1827,20 +1920,28 @@ func (s *sqlEntityServer) watchEvent(r *entity.EntityWatchRequest, result *entit
}
func (s *sqlEntityServer) FindReferences(ctx context.Context, r *entity.ReferenceRequest) (*entity.EntityListResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.FindReferences")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "findReferences"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
user, err := appcontext.User(ctx)
if err != nil {
ctxLogger.Error("error getting user from ctx", "error", err)
return nil, err
}
if user == nil {
return nil, fmt.Errorf("missing user in context")
ctxLogger.Error("could not find user in context", "error", errorUserNotFoundInContext)
return nil, errorUserNotFoundInContext
}
if r.NextPageToken != "" {
return nil, fmt.Errorf("not yet supported")
ctxLogger.Error("nextPageToken not yet supported", "error", errorNextPageTokenNotSupported)
return nil, errorNextPageTokenNotSupported
}
fields := []string{
@@ -1855,8 +1956,9 @@ func (s *sqlEntityServer) FindReferences(ctx context.Context, r *entity.Referenc
" FROM entity_ref AS er JOIN entity AS e ON er.guid = e.guid" +
" WHERE er.namespace=? AND er.group=? AND er.resource=? AND er.resolved_to=?"
rows, err := s.sess.Query(ctx, sql, r.Namespace, r.Group, r.Resource, r.Name)
rows, err := s.query(ctx, sql, r.Namespace, r.Group, r.Resource, r.Name)
if err != nil {
ctxLogger.Error("query error", "error", err)
return nil, err
}
defer func() { _ = rows.Close() }()
@@ -1875,6 +1977,7 @@ func (s *sqlEntityServer) FindReferences(ctx context.Context, r *entity.Referenc
err = rows.Scan(args...)
if err != nil {
ctxLogger.Error("error scanning rows", "error", err)
return rsp, err
}
@@ -1890,3 +1993,44 @@ func (s *sqlEntityServer) FindReferences(ctx context.Context, r *entity.Referenc
return rsp, err
}
func (s *sqlEntityServer) query(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.query", trace.WithAttributes(attribute.String("query", query)))
defer span.End()
rows, err := s.sess.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return rows, nil
}
func (s *sqlEntityServer) exec(ctx context.Context, tx *session.SessionTx, statement string, args ...any) error {
ctx, span := s.tracer.Start(ctx, "storage_server.exec", trace.WithAttributes(attribute.String("statement", statement)))
defer span.End()
_, err := tx.Exec(ctx, statement, args...)
return err
}
func (s *sqlEntityServer) insert(ctx context.Context, tx *session.SessionTx, table string, values map[string]any) error {
ctx, span := s.tracer.Start(ctx, "storage_server.insert", trace.WithAttributes(attribute.String("table", table)))
defer span.End()
err := s.dialect.Insert(ctx, tx, table, values)
return err
}
func (s *sqlEntityServer) update(ctx context.Context, tx *session.SessionTx, table string, row map[string]any, where map[string]any) error {
ctx, span := s.tracer.Start(ctx, "storage_server.db_update", trace.WithAttributes(attribute.String("table", table)))
defer span.End()
err := s.dialect.Update(
ctx,
tx,
table,
row,
where,
)
return err
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
@@ -132,7 +133,12 @@ func setUpTestServer(t *testing.T) entity.EntityStoreServer {
featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage))
require.NoError(t, err)
s, err := ProvideSQLEntityServer(entityDB)
traceConfig, err := tracing.ParseTracingConfig(sqlStore.Cfg)
require.NoError(t, err)
tracer, err := tracing.ProvideService(traceConfig)
require.NoError(t, err)
s, err := ProvideSQLEntityServer(entityDB, tracer)
require.NoError(t, err)
return s
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/components/satokengen"
@@ -85,7 +86,11 @@ func createTestContext(t *testing.T) testContext {
err = eDB.Init()
require.NoError(t, err)
store, err := sqlstash.ProvideSQLEntityServer(eDB)
traceConfig, err := tracing.ParseTracingConfig(env.Cfg)
require.NoError(t, err)
tracer, err := tracing.ProvideService(traceConfig)
require.NoError(t, err)
store, err := sqlstash.ProvideSQLEntityServer(eDB, tracer)
require.NoError(t, err)
client := entity.NewEntityStoreClientLocal(store)

View File

@@ -470,9 +470,10 @@ type Cfg struct {
RBACSingleOrganization bool
// GRPC Server.
GRPCServerNetwork string
GRPCServerAddress string
GRPCServerTLSConfig *tls.Config
GRPCServerNetwork string
GRPCServerAddress string
GRPCServerTLSConfig *tls.Config
GRPCServerEnableLogging bool // log request and response of each unary gRPC call
CustomResponseHeaders map[string]string
@@ -1756,6 +1757,7 @@ func readGRPCServerSettings(cfg *Cfg, iniFile *ini.File) error {
cfg.GRPCServerNetwork = valueAsString(server, "network", "tcp")
cfg.GRPCServerAddress = valueAsString(server, "address", "")
cfg.GRPCServerEnableLogging = server.Key("enable_logging").MustBool(false)
switch cfg.GRPCServerNetwork {
case "unix":
if cfg.GRPCServerAddress != "" {