Explore: Fix unsubscribing from Loki websocket (#19263)

This commit is contained in:
Andrej Ocenas
2019-09-23 22:48:39 +02:00
committed by GitHub
parent 9018050eca
commit 4c1bc59889
5 changed files with 44 additions and 9 deletions

View File

@@ -37,10 +37,9 @@ describe('Live Stream Tests', () => {
const labels: Labels = { job: 'varlogs' };
const target = makeTarget('fake', labels);
const stream = new LiveStreams().getStream(target);
expect.assertions(5);
expect.assertions(4);
const tests = [
(val: DataFrame[]) => expect(val).toEqual([]),
(val: DataFrame[]) => {
expect(val[0].length).toEqual(7);
expect(val[0].labels).toEqual(labels);

View File

@@ -1,8 +1,8 @@
import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
import { Observable, BehaviorSubject } from 'rxjs';
import { Observable } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import { LokiResponse } from './types';
import { finalize, map, multicast, refCount } from 'rxjs/operators';
import { finalize, map } from 'rxjs/operators';
import { appendResponseToBufferedData } from './result_transformer';
/**
@@ -32,7 +32,6 @@ export class LiveStreams {
data.addField({ name: 'line', type: FieldType.string });
data.addField({ name: 'labels', type: FieldType.other });
const subject = new BehaviorSubject<DataFrame[]>([]);
stream = webSocket(target.url).pipe(
finalize(() => {
delete this.streams[target.url];
@@ -40,9 +39,7 @@ export class LiveStreams {
map((response: LokiResponse) => {
appendResponseToBufferedData(response, data);
return [data];
}),
multicast(subject),
refCount()
})
);
this.streams[target.url] = stream;
}