grafana/public/app/plugins/datasource/loki/streaming.ts
Josh Hunt 3c6e0e8ef8
Chore: ESlint import order (#44959)
* Add and configure eslint-plugin-import

* Fix the lint:ts npm command

* Autofix + prettier all the files

* Manually fix remaining files

* Move jquery code in jest-setup to external file to safely reorder imports

* Resolve issue caused by circular dependencies within Prometheus

* Update .betterer.results

* Fix missing // @ts-ignore

* ignore iconBundle.ts

* Fix missing // @ts-ignore
2022-04-22 14:33:13 +01:00

83 lines
2.6 KiB
TypeScript

import { map, Observable, defer, mergeMap } from 'rxjs';
import { DataFrameJSON, DataQueryRequest, DataQueryResponse, LiveChannelScope, LoadingState } from '@grafana/data';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
import { LokiDatasource } from './datasource';
import { LokiQuery } from './types';
/**
* Calculate a unique key for the query. The key is used to pick a channel and should
* be unique for each distinct query execution plan. This key is not secure and is only picked to avoid
* possible collisions
*/
export async function getLiveStreamKey(query: LokiQuery): Promise<string> {
const str = JSON.stringify({ expr: query.expr });
const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array
const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message
const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
}
// This will get both v1 and v2 result formats
export function doLokiChannelStream(
query: LokiQuery,
ds: LokiDatasource,
options: DataQueryRequest<LokiQuery>
): Observable<DataQueryResponse> {
// maximum time to keep values
const range = options.range;
const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000;
let maxLength = options.maxDataPoints ?? 1000;
if (maxLength > 100) {
// for small buffers, keep them small
maxLength *= 2;
}
let frame: StreamingDataFrame | undefined = undefined;
const updateFrame = (msg: any) => {
if (msg?.message) {
const p = msg.message as DataFrameJSON;
if (!frame) {
frame = StreamingDataFrame.fromDataFrameJSON(p, {
maxLength,
maxDelta,
displayNameFormat: query.legendFormat,
});
} else {
frame.push(p);
}
}
return frame;
};
return defer(() => getLiveStreamKey(query)).pipe(
mergeMap((key) => {
return getGrafanaLiveSrv()
.getStream<any>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `tail/${key}`,
data: {
...query,
timeRange: {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
},
},
})
.pipe(
map((evt) => {
const frame = updateFrame(evt);
return {
data: frame ? [frame] : [],
state: LoadingState.Streaming,
};
})
);
})
);
}