blob: 5ef487094c25b661cc0675c8aa3d0a3ed3401104 [file] [log] [blame]
/**
* @module modules/dataframe/dataframe_context
* @description <dataframe-repository-sk/>
*
* DataFrameRepository manages a local cache of the traces.
*
* The class loads the traces with the initial paramset and extends using the
* same paramset. The paramset can only be reset by reloading the entire
* traces, that is, paramset being immutable.
*
* @provide
* dataframeContext: the DataFrame storing the current available data.
*
* @provide
* dataframeLoadingContext: whether there is a request in flight.
*
* @provide
* dataframeRepoContext: the repo object itself to manage data.
*
* @example
* In the html page, you attach the <dataframe-repository-sk> as a parent node,
* then in the child node implementation, you can request the context by:
* @consume({context:dataframeContext})
* dataframe: DataFrame
*
* this.dataframe will be assigned whenever there is a change.
*/
import { createContext, provide } from '@lit/context';
import { LitElement } from 'lit';
import { customElement } from 'lit/decorators.js';
import { mergeAnomaly, range } from './index';
import { fromParamSet } from '../../../infra-sk/modules/query';
import {
AnomalyMap,
ColumnHeader,
CommitNumberAnomalyMap,
ShiftRequest,
ShiftResponse,
} from '../json';
import {
DataFrame,
FrameRequest,
FrameResponse,
Trace,
TraceSet,
ReadOnlyParamSet,
} from '../json';
import { startRequest, messageByName } from '../progress/progress';
// Shift the range by offset.
// Note, shitf [0, 10] by 10 will give [1, 11].
const deltaRange = (range: range, offset: number) => {
const { begin, end } = range;
if (offset > 0) {
return { begin: end + 1, end: end + offset + 1 };
} else {
return { begin: begin + offset - 1, end: begin - 1 };
}
};
// Slice a range into smaller ranges with each size being chunkSize.
// The range is inclusive on both ends, the lengh is (end - begin) + 1.
// e.g. sliceRange({begin: 0, end:20}, 10) yields:
// {begin: 0, end: 9}
// {begin: 10, end: 19}
// {begin: 20, end: 20}
// And each subrange is also inclusive on both ends.
export const sliceRange = ({ begin, end }: range, chunkSize: number) => {
if (chunkSize <= 0) {
return [{ begin, end }];
}
// The range is inclusive on both ends
const span = end - begin + 1;
const slices = Math.ceil(span / chunkSize);
// Shortcut as this will be most of cases.
if (slices === 1) {
return [{ begin, end }];
}
// The first (n - 1) slices and then concat the last slice as the last one
// usually doesn't have the same length.
return Array.from({ length: slices - 1 }, (_, k): range => {
return {
begin: begin + k * chunkSize,
end: begin + (k + 1) * chunkSize - 1,
};
}).concat({ begin: begin + (slices - 1) * chunkSize, end: end });
};
// This context provides the dataframe when it is ready to use from the data
// store, typically a remote server or a local mock.
export const dataframeContext = createContext<DataFrame>(
Symbol('dataframe-context')
);
// This context indicates whether there is an ongoing dataframe request.
export const dataframeLoadingContext = createContext<boolean>(
Symbol('dataframe-loading-context')
);
// This context prodides the data repository to query the data.
export const dataframeRepoContext = createContext<DataFrameRepository>(
Symbol('dataframe-repo-context')
);
const emptyResolver = (_1: number, _2: DataFrame, _3: AnomalyMap) => {};
@customElement('dataframe-repository-sk')
export class DataFrameRepository extends LitElement {
private static shiftUrl = '/_/shift/';
private static frameStartUrl = '/_/frame/start';
private _paramset = ReadOnlyParamSet({});
// Most of logic to create the request is still in explore-simple-sk, we make
// it simple to refactor that takes whatever it has and simply extending its
// range.
private _baseRequest?: FrameRequest;
private _traceset = TraceSet({});
private _header: (ColumnHeader | null)[] = [];
// When the requests are more than this size, we slice into separate requests
// so they run concurrently to improve data fetching performance.
private chunkSize = 30 * 24 * 60 * 60; // 1 month in seconds.
@provide({ context: dataframeContext })
dataframe: DataFrame = {
traceset: TraceSet({}),
header: [],
paramset: ReadOnlyParamSet({}),
skip: 0,
};
@provide({ context: dataframeLoadingContext })
loading = false;
@provide({ context: dataframeRepoContext })
dfRepo = this;
anomaly: { [key: string]: CommitNumberAnomalyMap } = {};
// This element doesn't render anything and all the children should be
// attached to itself directly.
// Note, <slot></slot> is not needed if there is no shadow root.
protected createRenderRoot(): HTMLElement | DocumentFragment {
return this;
}
get paramset() {
return this._paramset;
}
get traces() {
return this._traceset;
}
get header() {
return this._header;
}
get queryString() {
return fromParamSet(this.paramset);
}
get isEmpty(): boolean {
const header = this.header;
return !header || header!.length === 0;
}
getRange(key: 'offset' | 'timestamp'): range {
if (this.isEmpty) {
return { begin: 0, end: 0 };
}
const header = this.header!;
const len = header!.length;
if (len > 1) {
return { begin: header[0]![key], end: header[len - 1]![key] };
} else {
return { begin: header[0]![key], end: header[0]![key] };
}
}
get commitRange(): range {
return this.getRange('offset');
}
get timeRange(): range {
return this.getRange('timestamp');
}
private addTraceset(header: (ColumnHeader | null)[], traceset: TraceSet) {
if (header.length <= 0) {
return;
}
// TODO(haowoo): this now is simply to prepend or append the given data,
// however, we should also try to replace duplicate if there is time
// overlap, or insert into middle of the dataset. This is not needed at
// the moment.
const isAfter = header[0]!.timestamp >= this.timeRange.end;
if (isAfter) {
this._header = (this.header || []).concat(header);
} else {
this._header = header.concat(this.header);
}
let traceHead = TraceSet({}),
traceTail = this.traces;
if (isAfter) {
[traceHead, traceTail] = [traceTail, traceHead];
}
Object.keys(this.traces).forEach((key) => {
this._traceset[key] = (traceHead[key] || [])
.concat(...traceset[key])
.concat(traceTail[key] || []) as Trace;
});
}
// Track the current ongoing request.
// Initiazlied as resolved because there are no requests.
private _requestComplete = Promise.resolve(0);
/**
* Returns a Promise that resolves when the ongoing request completes.
*/
get requestComplete(): Promise<number> {
return this._requestComplete;
}
private generateFrameRequest({ begin, end }: range) {
return {
begin: begin,
end: end,
request_type: 0,
queries: [this.queryString],
tz: Intl.DateTimeFormat().resolvedOptions().timeZone,
} as FrameRequest;
}
private async requestNewRange(range: range) {
if (!this._baseRequest) {
this._baseRequest = this.generateFrameRequest(range);
}
const req = structuredClone(this._baseRequest);
req.begin = range.begin;
req.end = range.end;
const resp = await startRequest(
DataFrameRepository.frameStartUrl,
req,
1000
).catch(() => {
return null;
});
if (resp === null) {
// We silently fails when the server returns an error for now.
console.log('fetch frame response failed.');
return {} as FrameResponse;
}
if (resp?.status !== 'Finished') {
// This usually happens when there is no commits for the given date,
// We emit an empty range so the UI still functions just w/o any data,
// The caller may handle this empty return gracefully on its own.
console.log(
'request range (',
new Date(range.begin * 1000),
new Date(range.end * 1000),
') failed with msg:',
resp?.messages
);
return {} as FrameResponse;
}
const msg = messageByName(resp.messages, 'Message');
if (msg) {
return Promise.reject(msg);
}
return resp.results as FrameResponse;
}
/**
* Reset the dataframe and its corresponding FrameRequest.
*
* This can be used to refill the data if it is from a different source, and this
* can take over and extend the time frame.
*
* @param dataframe The full dataframe from the request.
* @param request The completed FrameRequet that was sent for the dataframe.
*/
async resetWithDataframeAndRequest(
dataframe: DataFrame,
anomalies: AnomalyMap,
request: FrameRequest
) {
this._baseRequest = request;
this._baseRequest.request_type = 0; // change to timestamp-based query.
this.dataframe = dataframe;
this.anomaly = mergeAnomaly(this.anomaly, anomalies);
this._header = dataframe.header || [];
this._traceset = dataframe.traceset;
}
/**
* Reset and load the dataframe with the ParamSet.
*
* @param range The requested timestamp range in seconds.
* @param param The paramset to fetch the dataframe.
* @returns The promise that resolves to the length of traces when the
* request completes.
*/
async resetTraces(range: range, param: ReadOnlyParamSet) {
let resolver = emptyResolver;
const curRequest = this._requestComplete;
this._requestComplete = new Promise((resolve) => {
resolver = (n, df, anomaly) => {
this.dataframe = df;
this.anomaly = mergeAnomaly(this.anomaly, anomaly);
this.loading = false;
resolve(n);
};
});
await curRequest;
this.loading = true;
this._paramset = param;
const resp = await this.requestNewRange(range);
this._traceset = resp.dataframe?.traceset || TraceSet({});
this._header = resp.dataframe?.header || [];
const totalTraces = resp.dataframe?.header?.length || 0;
resolver(
totalTraces,
{
traceset: this._traceset,
header: this._header,
paramset: this._paramset,
skip: 0,
},
resp.anomalymap
);
return totalTraces;
}
/**
* Prepend or append additional traces to the dataframe using the same
* ParamSet.
*
* @param offsetInSeconds The offset extending, positive means forward,
* negative means backward.
* @returns The promise that resolves to the length of additional traces when
* the request completes.
*/
async extendRange(offsetInSeconds: number) {
let resolver = emptyResolver;
const curRequest = this._requestComplete;
this._requestComplete = new Promise((resolve) => {
resolver = (n, df, anomaly) => {
this.loading = false;
this.dataframe = df;
this.anomaly = mergeAnomaly(this.anomaly, anomaly);
resolve(n);
};
});
await curRequest;
this.loading = true;
const range = deltaRange(this.timeRange, offsetInSeconds);
const ranges = sliceRange(range, this.chunkSize);
const allResponses: Promise<FrameResponse>[] = [];
for (let slice = 0; slice < ranges.length; slice++) {
allResponses.push(this.requestNewRange(ranges[slice]));
}
// Fetch and sort the frame responses so they can appended consecutively.
const sortedResponses = (await Promise.all(allResponses))
.filter(
// Filter responses with valid dataframes with actual traces.
(fr) =>
fr.dataframe &&
(fr.dataframe.header?.length || 0) > 0 &&
Object.keys(fr.dataframe.traceset).length > 0
)
.sort(
(a: FrameResponse, b: FrameResponse) =>
a.dataframe!.header![0]!.offset - b.dataframe!.header![0]!.offset
);
const totalTraces = sortedResponses
.map((resp) => resp.dataframe!.header?.length || 0)
.reduce((count, cur) => count + cur, 0);
const header = ([] as (ColumnHeader | null)[]).concat(
...sortedResponses.map((resp) => resp.dataframe!.header)
);
const traceset = TraceSet({});
Object.keys(this.traces).forEach((key) => {
traceset[key] = (traceset[key] || []).concat(
...sortedResponses.map((resp) => resp.dataframe!.traceset[key])
) as Trace;
});
this.addTraceset(header, traceset);
const anomaly = sortedResponses.reduce(
(pre, cur) => mergeAnomaly(pre, cur.anomalymap),
{}
);
resolver(
totalTraces,
{
traceset: this._traceset,
header: this._header,
paramset: this._paramset,
skip: 0,
},
anomaly
);
return totalTraces;
}
protected async shift(commitRange: range, offset: number = -200) {
const req = deltaRange(commitRange, offset) as ShiftRequest;
const resp = await fetch(DataFrameRepository.shiftUrl, {
method: 'POST',
body: JSON.stringify(req),
headers: {
'Content-Type': 'application/json',
},
});
if (!resp.ok) {
return Promise.reject(resp.statusText);
}
return (await resp.json()) as ShiftResponse;
}
}