Tempo: Return new version of dataframe schema directly from the backend (#32116)

* Return dataframe directly from the backend

* Streamline some transforms

* Fix lint issues

* Remove unused lib

* Fix datasource test

* Fix imports and add some typings

* Fix the typings and some tests

* Add private doc comment

* Remove private tag

* Add comments

* Fix some API docs issues
This commit is contained in:
Andrej Ocenas 2021-03-22 19:09:15 +01:00 committed by GitHub
parent f8ec947700
commit 3ef9cac640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 616 additions and 109 deletions

1
go.mod
View File

@ -51,7 +51,6 @@ require (
github.com/hashicorp/go-version v1.2.1
github.com/inconshreveable/log15 v0.0.0-20180818164646-67afb5ed74ec
github.com/influxdata/influxdb-client-go/v2 v2.2.2
github.com/jaegertracing/jaeger v1.22.0
github.com/jmespath/go-jmespath v0.4.0
github.com/json-iterator/go v1.1.10
github.com/jung-kurt/gofpdf v1.16.2

View File

@ -42,6 +42,12 @@ function parseOptionalMeta(str?: string): any {
return undefined;
}
/**
* Parse arrow table to basically a standard dataFrame. Use together with base64StringToArrowTable to get data frame
* from backend data source response.
*
* @public
*/
export function arrowTableToDataFrame(table: Table): ArrowDataFrame {
const fields: Field[] = [];
@ -126,8 +132,8 @@ function toArrowVector(field: Field): ArrowVector {
}
/**
* @param keepOriginalNames by default, the exported Table will get names that match the
* display within grafana. This typically includes any labels defined in the metadata.
* @param keepOriginalNames - by default, the exported Table will get names that match the
* display within grafana. This typically includes any labels defined in the metadata.
*
* When using this function to round-trip data, be sure to set `keepOriginalNames=true`
*/
@ -173,6 +179,21 @@ export function grafanaDataFrameToArrowTable(data: DataFrame, keepOriginalNames?
return table;
}
/**
* Turns arrow table into a base64 encoded string. This is symmetrical to base64StringToArrowTable and can be used
* to simulate response from backend.
*
* @public
*/
export function arrowTableToBase64String(table: Table): string {
const binstring = Array.prototype.map
.call(table.serialize() as Uint8Array, (ch: number) => {
return String.fromCharCode(ch);
})
.join('');
return btoa(binstring);
}
function updateArrowTableNames(table: Table, frame: DataFrame): Table | undefined {
const cols: Column[] = [];
for (let i = 0; i < table.numCols; i++) {

View File

@ -20,6 +20,7 @@ export enum DataTopic {
Annotations = 'annotations',
}
// Should be kept in sync with grafana-plugin-sdk-go/data/frame_meta.go
export type PreferredVisualisationType = 'graph' | 'table' | 'logs' | 'trace' | 'nodeGraph';
/**

View File

@ -6,6 +6,11 @@ declare global {
namespace jest {
interface Matchers<R, T = {}> {
toEmitValues<E = ObservableType<T>>(expected: E[]): Promise<CustomMatcherResult>;
/**
* Collect all the values emitted by the observables (also errors) and pass them to the expectations functions after
* the observable ended (or emitted error). If Observable does not complete within OBSERVABLE_TEST_TIMEOUT_IN_MS the
* test fails.
*/
toEmitValuesWith<E = ObservableType<T>>(expectations: (received: E[]) => void): Promise<CustomMatcherResult>;
}
}

View File

@ -11,7 +11,13 @@ export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin';
export { reportMetaAnalytics } from './utils/analytics';
export { logInfo, logDebug, logWarning, logError } from './utils/logging';
export { DataSourceWithBackend, HealthCheckResult, HealthStatus } from './utils/DataSourceWithBackend';
export { toDataQueryError, toDataQueryResponse, frameToMetricFindValue } from './utils/queryResponse';
export {
toDataQueryError,
toDataQueryResponse,
frameToMetricFindValue,
BackendDataSourceResponse,
DataResponse,
} from './utils/queryResponse';
export { PanelRenderer, PanelRendererProps, PanelRendererType, setPanelRenderer } from './components/PanelRenderer';
export { setQueryRunnerFactory, createQueryRunner, QueryRunnerFactory } from './services/QueryRunner';
export { DataSourcePicker, DataSourcePickerProps, DataSourcePickerState } from './components/DataSourcePicker';

View File

@ -10,7 +10,7 @@ import {
import { Observable, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { getBackendSrv, getDataSourceSrv } from '../services';
import { toDataQueryResponse } from './queryResponse';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
const ExpressionDatasourceID = '__expr__';
@ -104,14 +104,14 @@ export class DataSourceWithBackend<
}
return getBackendSrv()
.fetch({
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
data: body,
requestId,
})
.pipe(
map((rsp: any) => {
map((rsp) => {
return toDataQueryResponse(rsp, queries as DataQuery[]);
}),
catchError((err) => {

View File

@ -7,16 +7,12 @@ const resp = {
results: {
A: {
refId: 'A',
series: null,
tables: null,
dataframes: [
'QVJST1cxAAD/////cAEAABAAAAAAAAoADgAMAAsABAAKAAAAFAAAAAAAAAEDAAoADAAAAAgABAAKAAAACAAAAFAAAAACAAAAKAAAAAQAAAAg////CAAAAAwAAAABAAAAQQAAAAUAAAByZWZJZAAAAED///8IAAAADAAAAAAAAAAAAAAABAAAAG5hbWUAAAAAAgAAAHwAAAAEAAAAnv///xQAAABAAAAAQAAAAAAAAwFAAAAAAQAAAAQAAACM////CAAAABQAAAAIAAAAQS1zZXJpZXMAAAAABAAAAG5hbWUAAAAAAAAAAIb///8AAAIACAAAAEEtc2VyaWVzAAASABgAFAATABIADAAAAAgABAASAAAAFAAAAEQAAABMAAAAAAAKAUwAAAABAAAADAAAAAgADAAIAAQACAAAAAgAAAAQAAAABAAAAHRpbWUAAAAABAAAAG5hbWUAAAAAAAAAAAAABgAIAAYABgAAAAAAAwAEAAAAdGltZQAAAAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAUABMADAAEAAwAAABgAAAAAAAAABQAAAAAAAADAwAKABgADAAIAAQACgAAABQAAABYAAAABgAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADAAAAAAAAAAMAAAAAAAAAAAAAAAAAAAADAAAAAAAAAAMAAAAAAAAAAAAAAAAgAAAAYAAAAAAAAAAAAAAAAAAAAGAAAAAAAAAAAAAAAAAAAAQMC/OcElXhZAOAEFxCVeFkCwQtDGJV4WQCiEm8klXhZAoMVmzCVeFkAYBzLPJV4WAAAAAAAA8D8AAAAAAAA0QAAAAAAAgFZAAAAAAAAAPkAAAAAAAAAUQAAAAAAAAAAAEAAAAAwAFAASAAwACAAEAAwAAAAQAAAALAAAADgAAAAAAAMAAQAAAIABAAAAAAAAwAAAAAAAAABgAAAAAAAAAAAAAAAAAAAAAAAKAAwAAAAIAAQACgAAAAgAAABQAAAAAgAAACgAAAAEAAAAIP///wgAAAAMAAAAAQAAAEEAAAAFAAAAcmVmSWQAAABA////CAAAAAwAAAAAAAAAAAAAAAQAAABuYW1lAAAAAAIAAAB8AAAABAAAAJ7///8UAAAAQAAAAEAAAAAAAAMBQAAAAAEAAAAEAAAAjP///wgAAAAUAAAACAAAAEEtc2VyaWVzAAAAAAQAAABuYW1lAAAAAAAAAACG////AAACAAgAAABBLXNlcmllcwAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABEAAAATAAAAAAACgFMAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAQAAAB0aW1lAAAAAAQAAABuYW1lAAAAAAAAAAAAAAYACAAGAAYAAAAAAAMABAAAAHRpbWUAAAAAmAEAAEFSUk9XMQ==',
],
},
B: {
refId: 'B',
series: null,
tables: null,
dataframes: [
'QVJST1cxAAD/////cAEAABAAAAAAAAoADgAMAAsABAAKAAAAFAAAAAAAAAEDAAoADAAAAAgABAAKAAAACAAAAFAAAAACAAAAKAAAAAQAAAAg////CAAAAAwAAAABAAAAQgAAAAUAAAByZWZJZAAAAED///8IAAAADAAAAAAAAAAAAAAABAAAAG5hbWUAAAAAAgAAAHwAAAAEAAAAnv///xQAAABAAAAAQAAAAAAAAwFAAAAAAQAAAAQAAACM////CAAAABQAAAAIAAAAQi1zZXJpZXMAAAAABAAAAG5hbWUAAAAAAAAAAIb///8AAAIACAAAAEItc2VyaWVzAAASABgAFAATABIADAAAAAgABAASAAAAFAAAAEQAAABMAAAAAAAKAUwAAAABAAAADAAAAAgADAAIAAQACAAAAAgAAAAQAAAABAAAAHRpbWUAAAAABAAAAG5hbWUAAAAAAAAAAAAABgAIAAYABgAAAAAAAwAEAAAAdGltZQAAAAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAUABMADAAEAAwAAABgAAAAAAAAABQAAAAAAAADAwAKABgADAAIAAQACgAAABQAAABYAAAABgAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADAAAAAAAAAAMAAAAAAAAAAAAAAAAAAAADAAAAAAAAAAMAAAAAAAAAAAAAAAAgAAAAYAAAAAAAAAAAAAAAAAAAAGAAAAAAAAAAAAAAAAAAAAQMC/OcElXhZAOAEFxCVeFkCwQtDGJV4WQCiEm8klXhZAoMVmzCVeFkAYBzLPJV4WAAAAAAAA8D8AAAAAAAA0QAAAAAAAgFZAAAAAAAAAPkAAAAAAAAAUQAAAAAAAAAAAEAAAAAwAFAASAAwACAAEAAwAAAAQAAAALAAAADgAAAAAAAMAAQAAAIABAAAAAAAAwAAAAAAAAABgAAAAAAAAAAAAAAAAAAAAAAAKAAwAAAAIAAQACgAAAAgAAABQAAAAAgAAACgAAAAEAAAAIP///wgAAAAMAAAAAQAAAEIAAAAFAAAAcmVmSWQAAABA////CAAAAAwAAAAAAAAAAAAAAAQAAABuYW1lAAAAAAIAAAB8AAAABAAAAJ7///8UAAAAQAAAAEAAAAAAAAMBQAAAAAEAAAAEAAAAjP///wgAAAAUAAAACAAAAEItc2VyaWVzAAAAAAQAAABuYW1lAAAAAAAAAACG////AAACAAgAAABCLXNlcmllcwAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABEAAAATAAAAAAACgFMAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAQAAAB0aW1lAAAAAAQAAABuYW1lAAAAAAAAAAAAAAYACAAGAAYAAAAAAAMABAAAAHRpbWUAAAAAmAEAAEFSUk9XMQ==',
],
@ -30,8 +26,6 @@ const resWithError = {
results: {
A: {
error: 'Hello Error',
series: null,
tables: null,
dataframes: [
'QVJST1cxAAD/////WAEAABAAAAAAAAoADgAMAAsABAAKAAAAFAAAAAAAAAEDAAoADAAAAAgABAAKAAAACAAAAJwAAAADAAAATAAAACgAAAAEAAAAPP///wgAAAAMAAAAAAAAAAAAAAAFAAAAcmVmSWQAAABc////CAAAAAwAAAAAAAAAAAAAAAQAAABuYW1lAAAAAHz///8IAAAANAAAACoAAAB7Im5vdGljZXMiOlt7InNldmVyaXR5IjoyLCJ0ZXh0IjoiVGV4dCJ9XX0AAAQAAABtZXRhAAAAAAEAAAAYAAAAAAASABgAFAAAABMADAAAAAgABAASAAAAFAAAAEQAAABMAAAAAAAAA0wAAAABAAAADAAAAAgADAAIAAQACAAAAAgAAAAQAAAABwAAAG51bWJlcnMABAAAAG5hbWUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAHAAAAbnVtYmVycwAAAAAA/////4gAAAAUAAAAAAAAAAwAFgAUABMADAAEAAwAAAAQAAAAAAAAABQAAAAAAAADAwAKABgADAAIAAQACgAAABQAAAA4AAAAAgAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAEAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAA8D8AAAAAAAAIQBAAAAAMABQAEgAMAAgABAAMAAAAEAAAACwAAAA4AAAAAAADAAEAAABoAQAAAAAAAJAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAACgAMAAAACAAEAAoAAAAIAAAAnAAAAAMAAABMAAAAKAAAAAQAAAA8////CAAAAAwAAAAAAAAAAAAAAAUAAAByZWZJZAAAAFz///8IAAAADAAAAAAAAAAAAAAABAAAAG5hbWUAAAAAfP///wgAAAA0AAAAKgAAAHsibm90aWNlcyI6W3sic2V2ZXJpdHkiOjIsInRleHQiOiJUZXh0In1dfQAABAAAAG1ldGEAAAAAAQAAABgAAAAAABIAGAAUAAAAEwAMAAAACAAEABIAAAAUAAAARAAAAEwAAAAAAAADTAAAAAEAAAAMAAAACAAMAAgABAAIAAAACAAAABAAAAAHAAAAbnVtYmVycwAEAAAAbmFtZQAAAAAAAAAAAAAGAAgABgAGAAAAAAACAAcAAABudW1iZXJzAIABAABBUlJPVzE=',
],
@ -41,7 +35,7 @@ const resWithError = {
};
const emptyResults = {
data: { '': { refId: '', meta: null, series: null, tables: null, dataframes: null } },
data: { results: { '': { refId: '' } } },
};
/* eslint-enable */
@ -226,19 +220,13 @@ describe('Query Response parser', () => {
data: {
results: {
X: {
series: [
{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]], tables: null, dataframes: null },
],
series: [{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]] }] as any,
},
B: {
series: [
{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]], tables: null, dataframes: null },
],
series: [{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]] }] as any,
},
A: {
series: [
{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]], tables: null, dataframes: null },
],
series: [{ name: 'Requests/s', points: [[13.594958983547151, 1611839862951]] }] as any,
},
},
},

View File

@ -13,15 +13,33 @@ import {
FieldType,
DataQuery,
} from '@grafana/data';
import { FetchResponse } from '../services';
interface DataResponse {
/**
* Single response object from a backend data source. Properties are optional but response should contain at least
* an error or a some data (but can contain both). Main way to send data is with dataframes attribute as series and
* tables data attributes are legacy formats.
*
* @internal
*/
export interface DataResponse {
error?: string;
refId?: string;
// base64 encoded arrow tables
dataframes?: string[];
series?: TimeSeries[];
tables?: TableData[];
}
/**
* This is the type of response expected form backend datasource.
*
* @internal
*/
export interface BackendDataSourceResponse {
results: KeyValue<DataResponse>;
}
/**
* Parse the results from /api/ds/query into a DataQueryResponse
*
@ -30,17 +48,24 @@ interface DataResponse {
*
* @public
*/
export function toDataQueryResponse(res: any, queries?: DataQuery[]): DataQueryResponse {
export function toDataQueryResponse(
res:
| { data: BackendDataSourceResponse | undefined }
| FetchResponse<BackendDataSourceResponse | undefined>
| DataQueryError,
queries?: DataQuery[]
): DataQueryResponse {
const rsp: DataQueryResponse = { data: [], state: LoadingState.Done };
if (res.data?.results) {
const results: KeyValue = res.data.results;
// If the response isn't in a correct shape we just ignore the data and pass empty DataQueryResponse.
if ((res as FetchResponse).data?.results) {
const results = (res as FetchResponse).data.results;
const resultIDs = Object.keys(results);
const refIDs = queries ? queries.map((q) => q.refId) : resultIDs;
const usedResultIDs = new Set<string>(resultIDs);
const data: DataResponse[] = [];
for (const refId of refIDs) {
const dr = results[refId] as DataResponse;
const dr = results[refId];
if (!dr) {
continue;
}
@ -52,7 +77,7 @@ export function toDataQueryResponse(res: any, queries?: DataQuery[]): DataQueryR
// Add any refIds that do not match the query targets
if (usedResultIDs.size) {
for (const refId of usedResultIDs) {
const dr = results[refId] as DataResponse;
const dr = results[refId];
if (!dr) {
continue;
}
@ -110,12 +135,12 @@ export function toDataQueryResponse(res: any, queries?: DataQuery[]): DataQueryR
}
// When it is not an OK response, make sure the error gets added
if (res.status && res.status !== 200) {
if ((res as FetchResponse).status && (res as FetchResponse).status !== 200) {
if (rsp.state !== LoadingState.Error) {
rsp.state = LoadingState.Error;
}
if (!rsp.error) {
rsp.error = toDataQueryError(res);
rsp.error = toDataQueryError(res as DataQueryError);
}
}
@ -128,7 +153,7 @@ export function toDataQueryResponse(res: any, queries?: DataQuery[]): DataQueryR
*
* @public
*/
export function toDataQueryError(err: any): DataQueryError {
export function toDataQueryError(err: DataQueryError | string | Object): DataQueryError {
const error = (err || {}) as DataQueryError;
if (!error.message) {

View File

@ -2,7 +2,6 @@ package tempo
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@ -12,11 +11,7 @@ import (
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
jaeger "github.com/jaegertracing/jaeger/model"
jaeger_json "github.com/jaegertracing/jaeger/model/converter/json"
ot_pdata "go.opentelemetry.io/collector/consumer/pdata"
ot_jaeger "go.opentelemetry.io/collector/translator/trace/jaeger"
)
type tempoExecutor struct {
@ -77,42 +72,17 @@ func (e *tempoExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource
otTrace := ot_pdata.NewTraces()
err = otTrace.FromOtlpProtoBytes(body)
if err != nil {
return plugins.DataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err)
}
jaegerBatches, err := ot_jaeger.InternalTracesToJaegerProto(otTrace)
frame, err := TraceToFrame(otTrace)
if err != nil {
return plugins.DataResponse{}, fmt.Errorf("failed to translate to jaegerBatches %v: %w", traceID, err)
}
jaegerTrace := &jaeger.Trace{
Spans: []*jaeger.Span{},
ProcessMap: []jaeger.Trace_ProcessMapping{},
}
// otel proto conversion doesn't set jaeger processes
for _, batch := range jaegerBatches {
for _, s := range batch.Spans {
s.Process = batch.Process
}
jaegerTrace.Spans = append(jaegerTrace.Spans, batch.Spans...)
jaegerTrace.ProcessMap = append(jaegerTrace.ProcessMap, jaeger.Trace_ProcessMapping{
Process: *batch.Process,
ProcessID: batch.Process.ServiceName,
})
}
jsonTrace := jaeger_json.FromDomain(jaegerTrace)
traceBytes, err := json.Marshal(jsonTrace)
if err != nil {
return plugins.DataResponse{}, fmt.Errorf("failed to json.Marshal trace \"%s\" :%w", traceID, err)
}
frames := []*data.Frame{
{Name: "Traces", RefID: refID, Fields: []*data.Field{data.NewField("trace", nil, []string{string(traceBytes)})}},
return plugins.DataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", traceID, err)
}
frame.RefID = refID
frames := []*data.Frame{frame}
queryResult.Dataframes = plugins.NewDecodedDataFrames(frames)
return plugins.DataResponse{

Binary file not shown.

View File

@ -0,0 +1,341 @@
package tempo
import (
"encoding/json"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)
type KeyValue struct {
Value interface{} `json:"value"`
Key string `json:"key"`
}
type TraceLog struct {
// Millisecond epoch time
Timestamp float64 `json:"timestamp"`
Fields []*KeyValue `json:"fields"`
}
func TraceToFrame(td pdata.Traces) (*data.Frame, error) {
// In open telemetry format the spans are grouped first by resource/service they originated in and inside that
// resource they are grouped by the instrumentation library which created them.
resourceSpans := td.ResourceSpans()
if resourceSpans.Len() == 0 {
return nil, nil
}
frame := &data.Frame{
Name: "Trace",
Fields: []*data.Field{
data.NewField("traceID", nil, []string{}),
data.NewField("spanID", nil, []string{}),
data.NewField("parentSpanID", nil, []string{}),
data.NewField("operationName", nil, []string{}),
data.NewField("serviceName", nil, []string{}),
data.NewField("serviceTags", nil, []string{}),
data.NewField("startTime", nil, []float64{}),
data.NewField("duration", nil, []float64{}),
data.NewField("logs", nil, []string{}),
data.NewField("tags", nil, []string{}),
},
Meta: &data.FrameMeta{
// TODO: use constant once available in the SDK
PreferredVisualization: "trace",
},
}
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
rows, err := resourceSpansToRows(rs)
if err != nil {
return nil, err
}
for _, row := range rows {
frame.AppendRow(row...)
}
}
return frame, nil
}
// resourceSpansToRows processes all the spans for a particular resource/service
func resourceSpansToRows(rs pdata.ResourceSpans) ([][]interface{}, error) {
resource := rs.Resource()
ilss := rs.InstrumentationLibrarySpans()
if resource.Attributes().Len() == 0 || ilss.Len() == 0 {
return [][]interface{}{}, nil
}
// Approximate the number of the spans as the number of the spans in the first
// instrumentation library info.
rows := make([][]interface{}, 0, ilss.At(0).Spans().Len())
for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)
// These are finally the actual spans
spans := ils.Spans()
for j := 0; j < spans.Len(); j++ {
span := spans.At(j)
row, err := spanToSpanRow(span, ils.InstrumentationLibrary(), resource)
if err != nil {
return nil, err
}
if row != nil {
rows = append(rows, row)
}
}
}
return rows, nil
}
func spanToSpanRow(span pdata.Span, libraryTags pdata.InstrumentationLibrary, resource pdata.Resource) ([]interface{}, error) {
traceID, err := traceIDToString(span.TraceID())
if err != nil {
return nil, err
}
spanID, err := spanIDToString(span.SpanID())
if err != nil {
return nil, err
}
// Should get error only if empty in which case we are ok with empty string
parentSpanID, _ := spanIDToString(span.ParentSpanID())
startTime := float64(span.StartTime()) / 1_000_000
serviceName, serviceTags := resourceToProcess(resource)
serviceTagsJson, err := json.Marshal(serviceTags)
if err != nil {
return nil, fmt.Errorf("failed to marshal service tags: %w", err)
}
spanTags, err := json.Marshal(getSpanTags(span, libraryTags))
if err != nil {
return nil, fmt.Errorf("failed to marshal span tags: %w", err)
}
logs, err := json.Marshal(spanEventsToLogs(span.Events()))
if err != nil {
return nil, fmt.Errorf("failed to marshal span logs: %w", err)
}
return []interface{}{
traceID,
spanID,
parentSpanID,
span.Name(),
serviceName,
toJSONString(serviceTagsJson),
startTime,
float64(span.EndTime()-span.StartTime()) / 1_000_000,
toJSONString(logs),
toJSONString(spanTags),
}, nil
}
func toJSONString(json []byte) string {
s := string(json)
if s == "null" {
return ""
}
return s
}
// TraceID can be the size of 2 uint64 in OT but we just need a string
func traceIDToString(traceID pdata.TraceID) (string, error) {
traceIDHigh, traceIDLow := tracetranslator.TraceIDToUInt64Pair(traceID)
if traceIDLow == 0 && traceIDHigh == 0 {
return "", fmt.Errorf("OC span has an all zeros trace ID")
}
return fmt.Sprintf("%d%d", traceIDHigh, traceIDLow), nil
}
func spanIDToString(spanID pdata.SpanID) (string, error) {
uSpanID := tracetranslator.SpanIDToUInt64(spanID)
if uSpanID == 0 {
return "", fmt.Errorf("OC span has an all zeros span ID")
}
return fmt.Sprintf("%d", uSpanID), nil
}
func resourceToProcess(resource pdata.Resource) (string, []*KeyValue) {
attrs := resource.Attributes()
serviceName := tracetranslator.ResourceNoServiceName
if attrs.Len() == 0 {
return serviceName, nil
}
tags := make([]*KeyValue, 0, attrs.Len()-1)
attrs.ForEach(func(key string, attr pdata.AttributeValue) {
if key == conventions.AttributeServiceName {
serviceName = attr.StringVal()
}
tags = append(tags, &KeyValue{Key: key, Value: getAttributeVal(attr)})
})
return serviceName, tags
}
func getAttributeVal(attr pdata.AttributeValue) interface{} {
switch attr.Type() {
case pdata.AttributeValueSTRING:
return attr.StringVal()
case pdata.AttributeValueINT:
return attr.IntVal()
case pdata.AttributeValueBOOL:
return attr.BoolVal()
case pdata.AttributeValueDOUBLE:
return attr.DoubleVal()
case pdata.AttributeValueMAP, pdata.AttributeValueARRAY:
return tracetranslator.AttributeValueToString(attr, false)
default:
return nil
}
}
func getSpanTags(span pdata.Span, instrumentationLibrary pdata.InstrumentationLibrary) []*KeyValue {
var tags []*KeyValue
libraryTags := getTagsFromInstrumentationLibrary(instrumentationLibrary)
if libraryTags != nil {
tags = append(tags, libraryTags...)
}
span.Attributes().ForEach(func(key string, attr pdata.AttributeValue) {
tags = append(tags, &KeyValue{Key: key, Value: getAttributeVal(attr)})
})
status := span.Status()
possibleNilTags := []*KeyValue{
getTagFromSpanKind(span.Kind()),
getTagFromStatusCode(status.Code()),
getErrorTagFromStatusCode(status.Code()),
getTagFromStatusMsg(status.Message()),
getTagFromTraceState(span.TraceState()),
}
for _, tag := range possibleNilTags {
if tag != nil {
tags = append(tags, tag)
}
}
return tags
}
func getTagsFromInstrumentationLibrary(il pdata.InstrumentationLibrary) []*KeyValue {
var keyValues []*KeyValue
if ilName := il.Name(); ilName != "" {
kv := &KeyValue{
Key: conventions.InstrumentationLibraryName,
Value: ilName,
}
keyValues = append(keyValues, kv)
}
if ilVersion := il.Version(); ilVersion != "" {
kv := &KeyValue{
Key: conventions.InstrumentationLibraryVersion,
Value: ilVersion,
}
keyValues = append(keyValues, kv)
}
return keyValues
}
func getTagFromSpanKind(spanKind pdata.SpanKind) *KeyValue {
var tagStr string
switch spanKind {
case pdata.SpanKindCLIENT:
tagStr = string(tracetranslator.OpenTracingSpanKindClient)
case pdata.SpanKindSERVER:
tagStr = string(tracetranslator.OpenTracingSpanKindServer)
case pdata.SpanKindPRODUCER:
tagStr = string(tracetranslator.OpenTracingSpanKindProducer)
case pdata.SpanKindCONSUMER:
tagStr = string(tracetranslator.OpenTracingSpanKindConsumer)
case pdata.SpanKindINTERNAL:
tagStr = string(tracetranslator.OpenTracingSpanKindInternal)
default:
return nil
}
return &KeyValue{
Key: tracetranslator.TagSpanKind,
Value: tagStr,
}
}
func getTagFromStatusCode(statusCode pdata.StatusCode) *KeyValue {
return &KeyValue{
Key: tracetranslator.TagStatusCode,
Value: int64(statusCode),
}
}
func getErrorTagFromStatusCode(statusCode pdata.StatusCode) *KeyValue {
if statusCode == pdata.StatusCodeError {
return &KeyValue{
Key: tracetranslator.TagError,
Value: true,
}
}
return nil
}
func getTagFromStatusMsg(statusMsg string) *KeyValue {
if statusMsg == "" {
return nil
}
return &KeyValue{
Key: tracetranslator.TagStatusMsg,
Value: statusMsg,
}
}
func getTagFromTraceState(traceState pdata.TraceState) *KeyValue {
if traceState != pdata.TraceStateEmpty {
return &KeyValue{
Key: tracetranslator.TagW3CTraceState,
Value: string(traceState),
}
}
return nil
}
func spanEventsToLogs(events pdata.SpanEventSlice) []*TraceLog {
if events.Len() == 0 {
return nil
}
logs := make([]*TraceLog, 0, events.Len())
for i := 0; i < events.Len(); i++ {
event := events.At(i)
fields := make([]*KeyValue, 0, event.Attributes().Len()+1)
if event.Name() != "" {
fields = append(fields, &KeyValue{
Key: tracetranslator.TagMessage,
Value: event.Name(),
})
}
event.Attributes().ForEach(func(key string, attr pdata.AttributeValue) {
fields = append(fields, &KeyValue{Key: key, Value: getAttributeVal(attr)})
})
logs = append(logs, &TraceLog{
Timestamp: float64(event.Timestamp()) / 1_000_000,
Fields: fields,
})
}
return logs
}

View File

@ -0,0 +1,111 @@
package tempo
import (
"io/ioutil"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
ot_pdata "go.opentelemetry.io/collector/consumer/pdata"
)
func TestTraceToFrame(t *testing.T) {
t.Run("should transform tempo protobuf response into dataframe", func(t *testing.T) {
// For what ever reason you cannot easily create pdata.Traces for the TraceToFrame from something more readable
// like json. You could tediously create the structures manually using all the setters for everything or use
// https://github.com/grafana/tempo/tree/master/pkg/tempopb to create the protobuf structs from something like
// json. At the moment just saving some real tempo proto response into file and loading was the easiest and
// as my patience was diminished trying to figure this out, I say it's good enough.
proto, err := ioutil.ReadFile("testData/tempo_proto_response")
require.NoError(t, err)
otTrace := ot_pdata.NewTraces()
err = otTrace.FromOtlpProtoBytes(proto)
require.NoError(t, err)
frame, err := TraceToFrame(otTrace)
require.NoError(t, err)
require.Equal(t, 30, frame.Rows())
require.ElementsMatch(t, fields, fieldNames(frame))
bFrame := &BetterFrame{frame}
root := rootSpan(bFrame)
require.NotNil(t, root)
require.Equal(t, "HTTP GET - loki_api_v1_query_range", root["operationName"])
require.Equal(t, "loki-all", root["serviceName"])
require.Equal(t, "[{\"value\":\"loki-all\",\"key\":\"service.name\"},{\"value\":\"Jaeger-Go-2.25.0\",\"key\":\"opencensus.exporterversion\"},{\"value\":\"4d019a031941\",\"key\":\"host.hostname\"},{\"value\":\"172.18.0.6\",\"key\":\"ip\"},{\"value\":\"4b19ace06df8e4de\",\"key\":\"client-uuid\"}]", root["serviceTags"])
require.Equal(t, 1616072924070.497, root["startTime"])
require.Equal(t, 8.421, root["duration"])
require.Equal(t, "", root["logs"])
require.Equal(t, "[{\"value\":\"const\",\"key\":\"sampler.type\"},{\"value\":true,\"key\":\"sampler.param\"},{\"value\":200,\"key\":\"http.status_code\"},{\"value\":\"GET\",\"key\":\"http.method\"},{\"value\":\"/loki/api/v1/query_range?direction=BACKWARD\\u0026limit=1000\\u0026query=%7Bcompose_project%3D%22devenv%22%7D%20%7C%3D%22traceID%22\\u0026start=1616070921000000000\\u0026end=1616072722000000000\\u0026step=2\",\"key\":\"http.url\"},{\"value\":\"net/http\",\"key\":\"component\"},{\"value\":\"server\",\"key\":\"span.kind\"},{\"value\":0,\"key\":\"status.code\"}]", root["tags"])
span := bFrame.FindRowWithValue("spanID", "8185345640360084998")
require.Equal(t, "GetParallelChunks", span["operationName"])
require.Equal(t, "loki-all", span["serviceName"])
require.Equal(t, "[{\"value\":\"loki-all\",\"key\":\"service.name\"},{\"value\":\"Jaeger-Go-2.25.0\",\"key\":\"opencensus.exporterversion\"},{\"value\":\"4d019a031941\",\"key\":\"host.hostname\"},{\"value\":\"172.18.0.6\",\"key\":\"ip\"},{\"value\":\"4b19ace06df8e4de\",\"key\":\"client-uuid\"}]", span["serviceTags"])
require.Equal(t, 1616072924072.852, span["startTime"])
require.Equal(t, 0.094, span["duration"])
require.Equal(t, "[{\"timestamp\":1616072924072.856,\"fields\":[{\"value\":1,\"key\":\"chunks requested\"}]},{\"timestamp\":1616072924072.9448,\"fields\":[{\"value\":1,\"key\":\"chunks fetched\"}]}]", span["logs"])
require.Equal(t, "[{\"value\":0,\"key\":\"status.code\"}]", span["tags"])
})
}
type Row map[string]interface{}
type BetterFrame struct {
frame *data.Frame
}
func (f *BetterFrame) GetRow(index int) Row {
row := f.frame.RowCopy(index)
betterRow := make(map[string]interface{})
for i, field := range row {
betterRow[f.frame.Fields[i].Name] = field
}
return betterRow
}
func (f *BetterFrame) FindRow(fn func(row Row) bool) Row {
for i := 0; i < f.frame.Rows(); i++ {
row := f.GetRow(i)
if fn(row) {
return row
}
}
return nil
}
func (f *BetterFrame) FindRowWithValue(fieldName string, value interface{}) Row {
return f.FindRow(func(row Row) bool {
return row[fieldName] == value
})
}
func rootSpan(frame *BetterFrame) Row {
return frame.FindRowWithValue("parentSpanID", "")
}
func fieldNames(frame *data.Frame) []string {
var names []string
for _, f := range frame.Fields {
names = append(names, f.Name)
}
return names
}
var fields = []string{
"traceID",
"spanID",
"parentSpanID",
"operationName",
"serviceName",
"serviceTags",
"startTime",
"duration",
"logs",
"tags",
}

View File

@ -1,39 +1,65 @@
import { DataSourceInstanceSettings, FieldType, MutableDataFrame, PluginType } from '@grafana/data';
import { backendSrv } from 'app/core/services/backend_srv';
import { of } from 'rxjs';
import {
arrowTableToBase64String,
DataFrame,
DataSourceInstanceSettings,
grafanaDataFrameToArrowTable,
MutableDataFrame,
PluginType,
} from '@grafana/data';
import { Observable, of } from 'rxjs';
import { createFetchResponse } from 'test/helpers/createFetchResponse';
import { TempoDatasource } from './datasource';
jest.mock('../../../../../packages/grafana-runtime/src/services/backendSrv.ts', () => ({
getBackendSrv: () => backendSrv,
}));
jest.mock('../../../../../packages/grafana-runtime/src/utils/queryResponse.ts', () => ({
toDataQueryResponse: (resp: any) => resp,
}));
import { FetchResponse, setBackendSrv, BackendDataSourceResponse } from '@grafana/runtime';
describe('Tempo data source', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('returns trace when queried', async () => {
const responseDataFrame = new MutableDataFrame({ fields: [{ name: 'trace', values: ['{}'] }] });
setupBackendSrv([responseDataFrame]);
it('parses json fields from backend', async () => {
setupBackendSrv(
new MutableDataFrame({
fields: [
{ name: 'serviceTags', values: ['{"key":"servicetag1","value":"service"}'] },
{ name: 'logs', values: ['{"timestamp":12345,"fields":[{"key":"count","value":1}]}'] },
{ name: 'tags', values: ['{"key":"tag1","value":"val1"}'] },
{ name: 'serviceName', values: ['service'] },
],
})
);
const ds = new TempoDatasource(defaultSettings);
await expect(ds.query({ targets: [{ query: '12345' }] } as any)).toEmitValuesWith((response) => {
const field = response[0].data[0].fields[0];
expect(field.name).toBe('trace');
expect(field.type).toBe(FieldType.trace);
const fields = (response[0].data[0] as DataFrame).fields;
expect(
fields.map((f) => ({
name: f.name,
values: f.values.toArray(),
}))
).toMatchObject([
{ name: 'serviceTags', values: [{ key: 'servicetag1', value: 'service' }] },
{ name: 'logs', values: [{ timestamp: 12345, fields: [{ key: 'count', value: 1 }] }] },
{ name: 'tags', values: [{ key: 'tag1', value: 'val1' }] },
{ name: 'serviceName', values: ['service'] },
]);
});
});
});
function setupBackendSrv(response: any) {
const defaultMock = () => of(createFetchResponse(response));
function setupBackendSrv(frame: DataFrame) {
setBackendSrv({
fetch(): Observable<FetchResponse<BackendDataSourceResponse>> {
return of(
createFetchResponse({
results: {
refid1: {
dataframes: [encode(frame)],
},
},
})
);
},
} as any);
}
const fetchMock = jest.spyOn(backendSrv, 'fetch');
fetchMock.mockImplementation(defaultMock);
function encode(frame: DataFrame) {
const table = grafanaDataFrameToArrowTable(frame);
return arrowTableToBase64String(table);
}
const defaultSettings: DataSourceInstanceSettings = {

View File

@ -1,11 +1,12 @@
import {
ArrayVector,
DataFrame,
DataQuery,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
Field,
FieldType,
MutableDataFrame,
} from '@grafana/data';
import { DataSourceWithBackend } from '@grafana/runtime';
import { Observable } from 'rxjs';
@ -27,22 +28,30 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery> {
return response;
}
return {
data: [
new MutableDataFrame({
fields: [
{
name: 'trace',
type: FieldType.trace,
values: [JSON.parse((response.data as DataFrame[])[0].fields[0].values.get(0))],
},
],
meta: {
preferredVisualisationType: 'trace',
},
}),
],
};
// We need to parse some of the fields which contain stringified json.
// Seems like we can't just map the values as the frame we got from backend has some default processing
// and will stringify the json back when we try to set it. So we create a new field and swap it instead.
const frame: DataFrame = response.data[0];
for (const fieldName of ['serviceTags', 'logs', 'tags']) {
const field = frame.fields.find((f) => f.name === fieldName);
if (field) {
const fieldIndex = frame.fields.indexOf(field);
const values = new ArrayVector();
const newField: Field = {
...field,
values,
type: FieldType.other,
};
for (let i = 0; i < field.values.length; i++) {
const value = field.values.get(i);
values.set(i, value === '' ? undefined : JSON.parse(value));
}
frame.fields[fieldIndex] = newField;
}
}
return response;
})
);
}

View File

@ -21,6 +21,11 @@ function tryExpectations(received: any[], expectations: (received: any[]) => voi
}
}
/**
* Collect all the values emitted by the observables (also errors) and pass them to the expectations functions after
* the observable ended (or emitted error). If Observable does not complete within OBSERVABLE_TEST_TIMEOUT_IN_MS the
* test fails.
*/
export function toEmitValuesWith(
received: Observable<any>,
expectations: (actual: any[]) => void