mirror of
https://github.com/grafana/grafana.git
synced 2025-01-01 11:47:05 -06:00
Instrumentation: Start tracing database requests (#34572)
Signed-off-by: bergquist <carl.bergquist@gmail.com>
This commit is contained in:
parent
29d34974e7
commit
a10fa5cad3
@ -1,6 +1,7 @@
|
||||
package datamigrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -15,7 +16,7 @@ import (
|
||||
func TestPasswordMigrationCommand(t *testing.T) {
|
||||
// setup datasources with password, basic_auth and none
|
||||
sqlstore := sqlstore.InitTestDB(t)
|
||||
session := sqlstore.NewSession()
|
||||
session := sqlstore.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
|
||||
datasources := []*models.DataSource{
|
||||
|
@ -54,7 +54,7 @@ func (dc *databaseCache) internalRunGC() {
|
||||
|
||||
func (dc *databaseCache) Get(key string) (interface{}, error) {
|
||||
cacheHit := CacheData{}
|
||||
session := dc.SQLStore.NewSession()
|
||||
session := dc.SQLStore.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
|
||||
exist, err := session.Where("cache_key= ?", key).Get(&cacheHit)
|
||||
@ -93,7 +93,7 @@ func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration
|
||||
return err
|
||||
}
|
||||
|
||||
session := dc.SQLStore.NewSession()
|
||||
session := dc.SQLStore.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
|
||||
var expiresInSeconds int64
|
||||
|
@ -547,7 +547,7 @@ type testContext struct {
|
||||
}
|
||||
|
||||
func (c *testContext) getAuthTokenByID(id int64) (*userAuthToken, error) {
|
||||
sess := c.sqlstore.NewSession()
|
||||
sess := c.sqlstore.NewSession(context.Background())
|
||||
var t userAuthToken
|
||||
found, err := sess.ID(id).Get(&t)
|
||||
if err != nil || !found {
|
||||
@ -558,7 +558,7 @@ func (c *testContext) getAuthTokenByID(id int64) (*userAuthToken, error) {
|
||||
}
|
||||
|
||||
func (c *testContext) markAuthTokenAsSeen(id int64) (bool, error) {
|
||||
sess := c.sqlstore.NewSession()
|
||||
sess := c.sqlstore.NewSession(context.Background())
|
||||
res, err := sess.Exec("UPDATE user_auth_token SET auth_token_seen = ? WHERE id = ?", c.sqlstore.Dialect.BooleanStr(true), id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -572,7 +572,7 @@ func (c *testContext) markAuthTokenAsSeen(id int64) (bool, error) {
|
||||
}
|
||||
|
||||
func (c *testContext) updateRotatedAt(id, rotatedAt int64) (bool, error) {
|
||||
sess := c.sqlstore.NewSession()
|
||||
sess := c.sqlstore.NewSession(context.Background())
|
||||
res, err := sess.Exec("UPDATE user_auth_token SET rotated_at = ? WHERE id = ?", rotatedAt, id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -19,7 +19,7 @@ func TestUserAuthTokenCleanup(t *testing.T) {
|
||||
|
||||
insertToken := func(token string, prev string, createdAt, rotatedAt int64) {
|
||||
ut := userAuthToken{AuthToken: token, PrevAuthToken: prev, CreatedAt: createdAt, RotatedAt: rotatedAt, UserAgent: "", ClientIp: ""}
|
||||
_, err := ctx.sqlstore.NewSession().Insert(&ut)
|
||||
_, err := ctx.sqlstore.NewSession(context.Background()).Insert(&ut)
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ func DeleteAlertNotification(cmd *models.DeleteAlertNotificationCommand) error {
|
||||
|
||||
func DeleteAlertNotificationWithUid(cmd *models.DeleteAlertNotificationWithUidCommand) error {
|
||||
existingNotification := &models.GetAlertNotificationsWithUidQuery{OrgId: cmd.OrgId, Uid: cmd.Uid}
|
||||
if err := getAlertNotificationWithUidInternal(existingNotification, newSession()); err != nil {
|
||||
if err := getAlertNotificationWithUidInternal(existingNotification, newSession(context.Background())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ func DeleteAlertNotificationWithUid(cmd *models.DeleteAlertNotificationWithUidCo
|
||||
}
|
||||
|
||||
func GetAlertNotifications(query *models.GetAlertNotificationsQuery) error {
|
||||
return getAlertNotificationInternal(query, newSession())
|
||||
return getAlertNotificationInternal(query, newSession(context.Background()))
|
||||
}
|
||||
|
||||
func (ss *SQLStore) addAlertNotificationUidByIdHandler() {
|
||||
@ -92,7 +92,7 @@ func (ss *SQLStore) GetAlertNotificationUidWithId(query *models.GetAlertNotifica
|
||||
return nil
|
||||
}
|
||||
|
||||
err := getAlertNotificationUidInternal(query, newSession())
|
||||
err := getAlertNotificationUidInternal(query, newSession(context.Background()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -107,7 +107,7 @@ func newAlertNotificationUidCacheKey(orgID, notificationId int64) string {
|
||||
}
|
||||
|
||||
func GetAlertNotificationsWithUid(query *models.GetAlertNotificationsWithUidQuery) error {
|
||||
return getAlertNotificationWithUidInternal(query, newSession())
|
||||
return getAlertNotificationWithUidInternal(query, newSession(context.Background()))
|
||||
}
|
||||
|
||||
func GetAllAlertNotifications(query *models.GetAllAlertNotificationsQuery) error {
|
||||
@ -442,7 +442,7 @@ func UpdateAlertNotification(cmd *models.UpdateAlertNotificationCommand) error {
|
||||
func UpdateAlertNotificationWithUid(cmd *models.UpdateAlertNotificationWithUidCommand) error {
|
||||
getAlertNotificationWithUidQuery := &models.GetAlertNotificationsWithUidQuery{OrgId: cmd.OrgId, Uid: cmd.Uid}
|
||||
|
||||
if err := getAlertNotificationWithUidInternal(getAlertNotificationWithUidQuery, newSession()); err != nil {
|
||||
if err := getAlertNotificationWithUidInternal(getAlertNotificationWithUidQuery, newSession(context.Background())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -129,7 +129,7 @@ func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
||||
Created: time.Now().AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond),
|
||||
}
|
||||
|
||||
session := fakeSQL.NewSession()
|
||||
session := fakeSQL.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
|
||||
_, err := session.Insert(a)
|
||||
@ -159,7 +159,7 @@ func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
|
||||
func assertAnnotationCount(t *testing.T, fakeSQL *SQLStore, sql string, expectedCount int64) {
|
||||
t.Helper()
|
||||
|
||||
session := fakeSQL.NewSession()
|
||||
session := fakeSQL.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
count, err := session.Where(sql).Count(&annotations.Item{})
|
||||
require.NoError(t, err)
|
||||
@ -169,7 +169,7 @@ func assertAnnotationCount(t *testing.T, fakeSQL *SQLStore, sql string, expected
|
||||
func assertAnnotationTagCount(t *testing.T, fakeSQL *SQLStore, expectedCount int64) {
|
||||
t.Helper()
|
||||
|
||||
session := fakeSQL.NewSession()
|
||||
session := fakeSQL.NewSession(context.Background())
|
||||
defer session.Close()
|
||||
|
||||
count, err := session.SQL("select count(*) from annotation_tag").Count()
|
||||
@ -211,12 +211,12 @@ func createTestAnnotations(t *testing.T, sqlstore *SQLStore, expectedCount int,
|
||||
a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond)
|
||||
}
|
||||
|
||||
_, err := sqlstore.NewSession().Insert(a)
|
||||
_, err := sqlstore.NewSession(context.Background()).Insert(a)
|
||||
require.NoError(t, err, "should be able to save annotation", err)
|
||||
|
||||
// mimick the SQL annotation Save logic by writing records to the annotation_tag table
|
||||
// we need to ensure they get deleted when we clean up annotations
|
||||
sess := sqlstore.NewSession()
|
||||
sess := sqlstore.NewSession(context.Background())
|
||||
for tagID := range []int{1, 2} {
|
||||
_, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.Id, tagID)
|
||||
require.NoError(t, err, "should be able to save annotation tag ID", err)
|
||||
|
@ -14,7 +14,10 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/lib/pq"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
ol "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
cw "github.com/weaveworks/common/middleware"
|
||||
"xorm.io/core"
|
||||
)
|
||||
|
||||
@ -70,11 +73,39 @@ func (h *databaseQueryWrapper) Before(ctx context.Context, query string, args ..
|
||||
|
||||
// After hook will get the timestamp registered on the Before hook and print the elapsed time
|
||||
func (h *databaseQueryWrapper) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
||||
h.instrument(ctx, "success", query, nil)
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (h *databaseQueryWrapper) instrument(ctx context.Context, status string, query string, err error) {
|
||||
begin := ctx.Value(databaseQueryWrapperKey{}).(time.Time)
|
||||
elapsed := time.Since(begin)
|
||||
databaseQueryHistogram.WithLabelValues("success").Observe(elapsed.Seconds())
|
||||
h.log.Debug("query finished", "status", "success", "elapsed time", elapsed, "sql", query)
|
||||
return ctx, nil
|
||||
|
||||
histogram := databaseQueryHistogram.WithLabelValues(status)
|
||||
if traceID, ok := cw.ExtractSampledTraceID(ctx); ok {
|
||||
// Need to type-convert the Observer to an
|
||||
// ExemplarObserver. This will always work for a
|
||||
// HistogramVec.
|
||||
histogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
|
||||
elapsed.Seconds(), prometheus.Labels{"traceID": traceID},
|
||||
)
|
||||
} else {
|
||||
histogram.Observe(elapsed.Seconds())
|
||||
}
|
||||
|
||||
span, _ := opentracing.StartSpanFromContext(ctx, "database query")
|
||||
defer span.Finish()
|
||||
|
||||
span.LogFields(
|
||||
ol.String("query", query),
|
||||
ol.String("status", status))
|
||||
|
||||
if err != nil {
|
||||
span.LogFields(ol.String("error", err.Error()))
|
||||
}
|
||||
|
||||
h.log.Debug("query finished", "status", status, "elapsed time", elapsed, "sql", query, "error", err)
|
||||
}
|
||||
|
||||
// OnError will be called if any error happens
|
||||
@ -85,10 +116,8 @@ func (h *databaseQueryWrapper) OnError(ctx context.Context, err error, query str
|
||||
status = "success"
|
||||
}
|
||||
|
||||
begin := ctx.Value(databaseQueryWrapperKey{}).(time.Time)
|
||||
elapsed := time.Since(begin)
|
||||
databaseQueryHistogram.WithLabelValues(status).Observe(elapsed.Seconds())
|
||||
h.log.Debug("query finished", "status", status, "elapsed time", elapsed, "sql", query, "error", err)
|
||||
h.instrument(ctx, status, query, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -19,12 +19,17 @@ func (sess *DBSession) publishAfterCommit(msg interface{}) {
|
||||
}
|
||||
|
||||
// NewSession returns a new DBSession
|
||||
func (ss *SQLStore) NewSession() *DBSession {
|
||||
return &DBSession{Session: ss.engine.NewSession()}
|
||||
func (ss *SQLStore) NewSession(ctx context.Context) *DBSession {
|
||||
sess := &DBSession{Session: ss.engine.NewSession()}
|
||||
sess.Session = sess.Session.Context(ctx)
|
||||
return sess
|
||||
}
|
||||
|
||||
func newSession() *DBSession {
|
||||
return &DBSession{Session: x.NewSession()}
|
||||
func newSession(ctx context.Context) *DBSession {
|
||||
sess := &DBSession{Session: x.NewSession()}
|
||||
sess.Session = sess.Session.Context(ctx)
|
||||
|
||||
return sess
|
||||
}
|
||||
|
||||
func startSession(ctx context.Context, engine *xorm.Engine, beginTran bool) (*DBSession, error) {
|
||||
@ -33,6 +38,7 @@ func startSession(ctx context.Context, engine *xorm.Engine, beginTran bool) (*DB
|
||||
sess, ok := value.(*DBSession)
|
||||
|
||||
if ok {
|
||||
sess.Session = sess.Session.Context(ctx)
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
@ -43,6 +49,8 @@ func startSession(ctx context.Context, engine *xorm.Engine, beginTran bool) (*DB
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
newSess.Session = newSess.Session.Context(ctx)
|
||||
return newSess, nil
|
||||
}
|
||||
|
||||
@ -53,6 +61,7 @@ func (ss *SQLStore) WithDbSession(ctx context.Context, callback dbTransactionFun
|
||||
|
||||
func withDbSession(ctx context.Context, engine *xorm.Engine, callback dbTransactionFunc) error {
|
||||
sess := &DBSession{Session: engine.NewSession()}
|
||||
sess.Session = sess.Session.Context(ctx)
|
||||
defer sess.Close()
|
||||
|
||||
return callback(sess)
|
||||
|
@ -3,11 +3,11 @@
|
||||
package sqlstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestSavingTags(t *testing.T) {
|
||||
@ -20,7 +20,7 @@ func TestSavingTags(t *testing.T) {
|
||||
{Key: "server", Value: "server-1"},
|
||||
{Key: "error"},
|
||||
}
|
||||
tags, err := EnsureTagsExist(newSession(), tagPairs)
|
||||
tags, err := EnsureTagsExist(newSession(context.Background()), tagPairs)
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(len(tags), ShouldEqual, 4)
|
||||
|
@ -530,6 +530,7 @@ func GetSignedInUser(ctx context.Context, query *models.GetSignedInUserQuery) er
|
||||
LEFT OUTER JOIN org on org.id = org_user.org_id `
|
||||
|
||||
sess := x.Table("user")
|
||||
sess = sess.Context(ctx)
|
||||
switch {
|
||||
case query.UserId > 0:
|
||||
sess.SQL(rawSQL+"WHERE u.id=?", query.UserId)
|
||||
|
Loading…
Reference in New Issue
Block a user