2019-09-05 08:04:01 -04:00
|
|
|
import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
|
2019-09-23 22:48:39 +02:00
|
|
|
import { Observable } from 'rxjs';
|
2019-09-05 08:04:01 -04:00
|
|
|
import { webSocket } from 'rxjs/webSocket';
|
2020-04-22 12:59:06 +01:00
|
|
|
import { LokiTailResponse } from './types';
|
2019-09-23 22:48:39 +02:00
|
|
|
import { finalize, map } from 'rxjs/operators';
|
2020-04-22 12:59:06 +01:00
|
|
|
import { appendResponseToBufferedData } from './result_transformer';
|
2019-09-05 08:04:01 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Maps directly to a query in the UI (refId is key)
|
|
|
|
|
*/
|
2020-04-22 12:59:06 +01:00
|
|
|
export interface LokiLiveTarget {
|
2019-09-05 08:04:01 -04:00
|
|
|
query: string;
|
|
|
|
|
url: string;
|
|
|
|
|
refId: string;
|
|
|
|
|
size: number;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Cache of websocket streams that can be returned as observable. In case there already is a stream for particular
|
|
|
|
|
* target it is returned and on subscription returns the latest dataFrame.
|
|
|
|
|
*/
|
|
|
|
|
export class LiveStreams {
|
|
|
|
|
private streams: KeyValue<Observable<DataFrame[]>> = {};
|
|
|
|
|
|
2020-04-22 12:59:06 +01:00
|
|
|
getStream(target: LokiLiveTarget): Observable<DataFrame[]> {
|
2019-09-05 08:04:01 -04:00
|
|
|
let stream = this.streams[target.url];
|
2019-11-15 15:38:25 +00:00
|
|
|
|
|
|
|
|
if (stream) {
|
|
|
|
|
return stream;
|
2019-09-05 08:04:01 -04:00
|
|
|
}
|
2019-11-15 15:38:25 +00:00
|
|
|
|
|
|
|
|
const data = new CircularDataFrame({ capacity: target.size });
|
|
|
|
|
data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
|
2020-01-26 23:13:56 +01:00
|
|
|
data.addField({ name: 'tsNs', type: FieldType.time, config: { title: 'Time ns' } });
|
2019-11-15 15:38:25 +00:00
|
|
|
data.addField({ name: 'line', type: FieldType.string }).labels = parseLabels(target.query);
|
|
|
|
|
data.addField({ name: 'labels', type: FieldType.other }); // The labels for each line
|
|
|
|
|
data.addField({ name: 'id', type: FieldType.string });
|
|
|
|
|
|
|
|
|
|
stream = webSocket(target.url).pipe(
|
|
|
|
|
finalize(() => {
|
|
|
|
|
delete this.streams[target.url];
|
|
|
|
|
}),
|
|
|
|
|
|
|
|
|
|
map((response: LokiTailResponse) => {
|
|
|
|
|
appendResponseToBufferedData(response, data);
|
|
|
|
|
return [data];
|
|
|
|
|
})
|
|
|
|
|
);
|
|
|
|
|
this.streams[target.url] = stream;
|
|
|
|
|
|
2019-09-05 08:04:01 -04:00
|
|
|
return stream;
|
|
|
|
|
}
|
|
|
|
|
}
|