TestData: stream via fetch (#16963)

Stream from fetch CSV
This commit is contained in:
Ryan McKinley
2019-05-09 09:42:35 -07:00
committed by GitHub
parent 0c1530c7a8
commit edcbc11774
5 changed files with 74 additions and 3 deletions

View File

@@ -9,6 +9,7 @@ import {
DataStreamState,
LoadingState,
LogLevel,
CSVReader,
} from '@grafana/ui';
import { TestDataQuery, StreamingQuery } from './types';
@@ -57,6 +58,8 @@ export class StreamHandler {
this.workers[key] = new SignalWorker(key, query, req, observer);
} else if (type === 'logs') {
this.workers[key] = new LogsWorker(key, query, req, observer);
} else if (type === 'fetch') {
this.workers[key] = new FetchWorker(key, query, req, observer);
} else {
throw {
message: 'Unknown Stream type: ' + type,
@@ -72,6 +75,7 @@ export class StreamHandler {
* Manages a single stream request
*/
export class StreamWorker {
refId: string;
query: StreamingQuery;
stream: DataStreamState;
observer: DataStreamObserver;
@@ -85,6 +89,7 @@ export class StreamWorker {
request,
unsubscribe: this.unsubscribe,
};
this.refId = query.refId;
this.query = query.stream;
this.last = Date.now();
this.observer = observer;
@@ -207,6 +212,56 @@ export class SignalWorker extends StreamWorker {
};
}
export class FetchWorker extends StreamWorker {
csv: CSVReader;
reader: ReadableStreamReader<Uint8Array>;
constructor(key: string, query: TestDataQuery, request: DataQueryRequest, observer: DataStreamObserver) {
super(key, query, request, observer);
if (!query.stream.url) {
throw new Error('Missing Fetch URL');
}
if (!query.stream.url.startsWith('http')) {
throw new Error('Fetch URL must be absolute');
}
this.csv = new CSVReader({ callback: this });
fetch(new Request(query.stream.url)).then(response => {
this.reader = response.body.getReader();
this.reader.read().then(this.processChunk);
});
}
processChunk = (value: ReadableStreamReadResult<Uint8Array>): any => {
if (this.observer == null) {
return; // Nothing more to do
}
if (value.value) {
const text = new TextDecoder().decode(value.value);
this.csv.readCSV(text);
}
if (value.done) {
console.log('Finished stream');
this.stream.state = LoadingState.Done;
return;
}
return this.reader.read().then(this.processChunk);
};
onHeader = (series: SeriesData) => {
series.refId = this.refId;
this.stream.series = [series];
};
onRow = (row: any[]) => {
// TODO?? this will send an event for each row, even if the chunk passed a bunch of them
this.appendRows([row]);
};
}
export class LogsWorker extends StreamWorker {
index = 0;