Add anomalymap data to DataFrame context.

AnomalyMap is part of DataFrame and is able to merge from another
FrameResponse. The AnomalyMap is a trace-keyed sparsed array so we need
to iterate it like a hash table.

This will replace related methods in explore-simple-sk.

Bug: b/370804338, b/361354421
Change-Id: Ia03654b45e74aff2eb924b0b930e18874bf77a89
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/905339
Reviewed-by: Leina Sun <sunxiaodi@google.com>
Commit-Queue: Hao Wu <haowoo@google.com>
diff --git a/perf/modules/dataframe/dataframe_context.ts b/perf/modules/dataframe/dataframe_context.ts
index e68ec59..8a9853c 100644
--- a/perf/modules/dataframe/dataframe_context.ts
+++ b/perf/modules/dataframe/dataframe_context.ts
@@ -29,9 +29,15 @@
 import { LitElement } from 'lit';
 import { customElement } from 'lit/decorators.js';
 
-import { range } from './index';
+import { mergeAnomaly, range } from './index';
 import { fromParamSet } from '../../../infra-sk/modules/query';
-import { ColumnHeader, ShiftRequest, ShiftResponse } from '../json';
+import {
+  AnomalyMap,
+  ColumnHeader,
+  CommitNumberAnomalyMap,
+  ShiftRequest,
+  ShiftResponse,
+} from '../json';
 import {
   DataFrame,
   FrameRequest,
@@ -90,6 +96,8 @@
   Symbol('dataframe-repo-context')
 );
 
+const emptyResolver = (_1: number, _2: DataFrame, _3: AnomalyMap) => {};
+
 @customElement('dataframe-repository-sk')
 export class DataFrameRepository extends LitElement {
   private static shiftUrl = '/_/shift/';
@@ -125,6 +133,8 @@
   @provide({ context: dataframeRepoContext })
   dfRepo = this;
 
+  anomaly: { [key: string]: CommitNumberAnomalyMap } = {};
+
   // This element doesn't render anything and all the children should be
   // attached to itself directly.
   // Note, <slot></slot> is not needed if there is no shadow root.
@@ -273,12 +283,14 @@
    */
   async resetWithDataframeAndRequest(
     dataframe: DataFrame,
+    anomalies: AnomalyMap,
     request: FrameRequest
   ) {
     this._baseRequest = request;
     this._baseRequest.request_type = 0; // change to timestamp-based query.
 
     this.dataframe = dataframe;
+    this.anomaly = mergeAnomaly(this.anomaly, anomalies);
     this._header = dataframe.header || [];
     this._traceset = dataframe.traceset;
   }
@@ -292,11 +304,12 @@
    *  request completes.
    */
   async resetTraces(range: range, param: ReadOnlyParamSet) {
-    let resolver = (_1: number, _2: DataFrame) => {};
+    let resolver = emptyResolver;
     const curRequest = this._requestComplete;
     this._requestComplete = new Promise((resolve) => {
-      resolver = (n, df) => {
+      resolver = (n, df, anomaly) => {
         this.dataframe = df;
+        this.anomaly = mergeAnomaly(this.anomaly, anomaly);
         this.loading = false;
         resolve(n);
       };
@@ -311,12 +324,16 @@
     this._header = resp.dataframe?.header || [];
 
     const totalTraces = resp.dataframe?.header?.length || 0;
-    resolver(totalTraces, {
-      traceset: this._traceset,
-      header: this._header,
-      paramset: this._paramset,
-      skip: 0,
-    });
+    resolver(
+      totalTraces,
+      {
+        traceset: this._traceset,
+        header: this._header,
+        paramset: this._paramset,
+        skip: 0,
+      },
+      resp.anomalymap
+    );
     return totalTraces;
   }
 
@@ -330,12 +347,13 @@
    *  the request completes.
    */
   async extendRange(offsetInSeconds: number) {
-    let resolver = (_1: number, _2: DataFrame) => {};
+    let resolver = emptyResolver;
     const curRequest = this._requestComplete;
     this._requestComplete = new Promise((resolve) => {
-      resolver = (n, df) => {
+      resolver = (n, df, anomaly) => {
         this.loading = false;
         this.dataframe = df;
+        this.anomaly = mergeAnomaly(this.anomaly, anomaly);
         resolve(n);
       };
     });
@@ -375,12 +393,20 @@
     });
     this.addTraceset(header, traceset);
 
-    resolver(totalTraces, {
-      traceset: this._traceset,
-      header: this._header,
-      paramset: this._paramset,
-      skip: 0,
-    });
+    const anomaly = sortedResponses.reduce(
+      (pre, cur) => mergeAnomaly(pre, cur.anomalymap),
+      {}
+    );
+    resolver(
+      totalTraces,
+      {
+        traceset: this._traceset,
+        header: this._header,
+        paramset: this._paramset,
+        skip: 0,
+      },
+      anomaly
+    );
     return totalTraces;
   }
 
diff --git a/perf/modules/dataframe/dataframe_context_test.ts b/perf/modules/dataframe/dataframe_context_test.ts
index 637642e..36b30bb 100644
--- a/perf/modules/dataframe/dataframe_context_test.ts
+++ b/perf/modules/dataframe/dataframe_context_test.ts
@@ -6,7 +6,11 @@
 import { ColumnHeader, ReadOnlyParamSet } from '../json';
 import fetchMock from 'fetch-mock';
 import { setUpElementUnderTest } from '../../../infra-sk/modules/test_util';
-import { generateFullDataFrame, mockFrameStart } from './test_utils';
+import {
+  generateAnomalyMap,
+  generateFullDataFrame,
+  mockFrameStart,
+} from './test_utils';
 
 const now = 1726081856; // an arbitrary UNIX time;
 const timeSpan = 89; // an arbitrary prime number for time span between commits .
@@ -28,7 +32,12 @@
     ref_mode: ['head'],
   });
 
-  const df = generateFullDataFrame({ begin: 90, end: 120 }, now, 1, [timeSpan]);
+  const df = generateFullDataFrame({ begin: 90, end: 120 }, now, 3, [timeSpan]);
+  const anomaly = generateAnomalyMap(df, [
+    { commit: 5, bugId: 555 },
+    { commit: 15, bugId: 1515 },
+    { commit: 25, bugId: 2525 },
+  ]);
   afterEach(() => {
     fetchMock.reset();
   });
@@ -43,7 +52,7 @@
   });
 
   it('initialize w/ data', async () => {
-    mockFrameStart(df, paramset);
+    mockFrameStart(df, paramset, anomaly);
 
     const dfRepo = newEl();
     assert.equal(
@@ -56,15 +65,18 @@
       ),
       10
     );
+    // The trace key generated from generateFullDataFrame.
+    const traceKey = ',key=0';
     assert.isTrue(sorted(dfRepo.header));
     assert.sameOrderedMembers(
-      df.traceset[',key=0'].slice(1, 11),
-      dfRepo.traces[',key=0']
+      df.traceset[traceKey].slice(1, 11),
+      dfRepo.traces[traceKey]
     );
+    assert.equal(dfRepo.anomaly[traceKey]![95].bug_id, 555);
   });
 
   it('init data and extend range', async () => {
-    mockFrameStart(df, paramset);
+    mockFrameStart(df, paramset, anomaly);
 
     const dfRepo = newEl();
     assert.equal(
@@ -78,13 +90,18 @@
       10
     );
 
+    // The trace key generated from generateFullDataFrame.
+    const traceKey = ',key=0';
+    assert.isUndefined(dfRepo.anomaly[traceKey]![105]);
+
     assert.equal(await dfRepo.extendRange(timeSpan * 10), 10);
     assert.isTrue(sorted(dfRepo.header));
     assert.lengthOf(dfRepo.header, 20);
     assert.sameOrderedMembers(
-      df.traceset[',key=0'].slice(0, 20),
-      dfRepo.traces[',key=0']
+      df.traceset[traceKey].slice(0, 20),
+      dfRepo.traces[traceKey]
     );
+    assert.equal(dfRepo.anomaly[traceKey]![105].bug_id, 1515);
   });
 
   it('init data and extend range both ways', async () => {
diff --git a/perf/modules/dataframe/index.ts b/perf/modules/dataframe/index.ts
index 54ce796..9e669d2 100644
--- a/perf/modules/dataframe/index.ts
+++ b/perf/modules/dataframe/index.ts
@@ -1,6 +1,14 @@
 // Contains DataFrame merge logic, similar to //perf/go/dataframe/dataframe.go
 
-import { DataFrame, ParamSet, ColumnHeader, TraceSet, Trace } from '../json';
+import {
+  DataFrame,
+  ParamSet,
+  ColumnHeader,
+  TraceSet,
+  Trace,
+  AnomalyMap,
+  CommitNumberAnomalyMap,
+} from '../json';
 import {
   addParamSet,
   addParamsToParamSet,
@@ -45,6 +53,80 @@
   };
 };
 
+/**
+ * Merge two AnomalyMap and return a new AnomalyMap.
+ *
+ * This function always returns a non-nil AnomalyMap.
+ * @param anomaly1 The first anomaly.
+ * @param anomalies The list of anomaly to be merged.
+ * @returns The new AnomalyMap.
+ */
+export const mergeAnomaly = (
+  anomaly1: AnomalyMap,
+  ...anomalies: AnomalyMap[]
+) => {
+  const anomaly = structuredClone(anomaly1 || {});
+  if (anomalies.length <= 0) {
+    return structuredClone(anomaly1 || {});
+  }
+
+  /**
+   * {
+   *  ',trace=key1': {
+   *    commit_position1: Anomaly 1,
+   *    commit_position2: Anomaly 2,
+   *    ...
+   *  },
+   *  `,trace=key2': {...}
+   * }
+   */
+  anomalies.forEach((anomaly2) => {
+    for (const trace in anomaly2) {
+      // Merge the anomaly from anomaly2 for the trace.
+      // First we use the existing merged anomaly as the base, and then add
+      // or update the anomaly data at each commit.
+      const commitAnomaly = anomaly[trace] || {};
+      const commitAnomaly2 = anomaly2[trace];
+
+      // In each trace, the anomaly is in the sparsed array, so it's more
+      // efficient to iterater using keys.
+      for (const commit in commitAnomaly2) {
+        const commitNum = Number(commit);
+
+        // The anomaly at commitNum will either be ovrridden and added to the
+        // anomaly for the current trace.
+        commitAnomaly[commitNum] = commitAnomaly2![commitNum];
+      }
+
+      // Override with the updated anomaly for the trace.
+      anomaly[trace] = commitAnomaly;
+    }
+  });
+  return anomaly;
+};
+
+export const findAnomalyInRange = (
+  allAnomaly: AnomalyMap,
+  range: range
+): AnomalyMap => {
+  const anomaly: AnomalyMap = {};
+  for (const trace in allAnomaly) {
+    const commitAnomaly: CommitNumberAnomalyMap = {};
+    const traceAnomaly = allAnomaly![trace];
+    for (const commit in traceAnomaly) {
+      const commitNum = Number(commit);
+      if (commitNum >= range.begin && commitNum <= range.end) {
+        commitAnomaly[commitNum] = traceAnomaly[commitNum];
+      }
+    }
+
+    if (Object.keys(commitAnomaly).length > 0) {
+      anomaly[trace] = commitAnomaly;
+    }
+  }
+  return anomaly;
+};
+
 /** mergeColumnHeaders creates a merged header from the two given headers.
  *
  * I.e. {1,4,5} + {3,4} => {1,3,4,5}
diff --git a/perf/modules/dataframe/index_test.ts b/perf/modules/dataframe/index_test.ts
index 58e2e90..045b941 100644
--- a/perf/modules/dataframe/index_test.ts
+++ b/perf/modules/dataframe/index_test.ts
@@ -9,6 +9,8 @@
   join,
   timestampBounds,
   findSubDataframe,
+  mergeAnomaly,
+  findAnomalyInRange,
 } from './index';
 import {
   DataFrame,
@@ -20,7 +22,7 @@
   TimestampSeconds,
 } from '../json';
 import { MISSING_DATA_SENTINEL } from '../const/const';
-import { generateFullDataFrame } from './test_utils';
+import { generateAnomalyMap, generateFullDataFrame } from './test_utils';
 
 const e = MISSING_DATA_SENTINEL;
 
@@ -88,6 +90,91 @@
   });
 });
 
+describe('merge anomaly', () => {
+  const df = generateFullDataFrame(
+    { begin: 0, end: 20 },
+    1,
+    5,
+    [1, 3, 6, 4, 2]
+  );
+  const anomaly = generateAnomalyMap(df, [
+    { trace: 1, commit: 4, bugId: 4001 },
+    { trace: 2, commit: 4, bugId: 4002 },
+    { trace: 2, commit: 7, bugId: 7002 },
+  ]);
+  const updated = generateAnomalyMap(df, [
+    { trace: 1, commit: 4, bugId: 4101 },
+    { trace: 2, commit: 7, bugId: 7102 },
+  ]);
+
+  it('merge empty (always return non-null)', () => {
+    assert.isNotNull(mergeAnomaly(null));
+    assert.isEmpty(mergeAnomaly(null));
+    assert.isNotNull(mergeAnomaly(null, {}, null));
+  });
+
+  it('merge empty w/ non-empty', () => {
+    const anomaly1 = mergeAnomaly(
+      null,
+      findAnomalyInRange(anomaly, { begin: 5, end: 10 })
+    );
+    assert.isUndefined(anomaly1[',key=1']);
+    assert.equal(anomaly1[',key=2']![7].bug_id, 7002);
+  });
+
+  it('merge non-empty w/ non-empty', () => {
+    const anomaly1 = findAnomalyInRange(anomaly, { begin: 0, end: 5 })!;
+    assert.equal(anomaly1[',key=1']![4].bug_id, 4001);
+    assert.equal(anomaly1[',key=2']![4].bug_id, 4002);
+    assert.isUndefined(anomaly1[',key=2']![7]);
+
+    const anomaly2 = mergeAnomaly(
+      anomaly1,
+      findAnomalyInRange(anomaly, { begin: 5, end: 10 })
+    );
+    assert.equal(anomaly2[',key=1']![4].bug_id, 4001);
+    assert.equal(anomaly2[',key=2']![7].bug_id, 7002);
+  });
+
+  it('merge w/ updated entries', () => {
+    const anomaly1 = findAnomalyInRange(anomaly, { begin: 0, end: 5 })!;
+    assert.equal(anomaly1[',key=1']![4].bug_id, 4001);
+
+    const anomaly2 = mergeAnomaly(
+      anomaly1,
+      findAnomalyInRange(updated, { begin: 0, end: 10 })!
+    );
+    assert.equal(anomaly2[',key=1']![4].bug_id, 4101);
+    assert.equal(anomaly2[',key=2']![7].bug_id, 7102);
+  });
+
+  it('merge w/ new traces', () => {
+    const anomaly1 = findAnomalyInRange(anomaly, { begin: 5, end: 10 })!;
+    assert.equal(anomaly1[',key=2']![7].bug_id, 7002);
+    assert.isUndefined(anomaly1[',key=1']);
+
+    const anomaly2 = mergeAnomaly(
+      anomaly1,
+      findAnomalyInRange(anomaly, { begin: 0, end: 5 })!
+    );
+    assert.equal(anomaly2[',key=1']![4].bug_id, 4001);
+    assert.equal(anomaly2[',key=2']![4].bug_id, 4002);
+  });
+
+  it('merge w/ new and updated traces', () => {
+    const anomaly1 = findAnomalyInRange(anomaly, { begin: 5, end: 10 })!;
+    assert.equal(anomaly1[',key=2']![7].bug_id, 7002);
+    assert.isUndefined(anomaly1[',key=1']);
+
+    const anomaly2 = mergeAnomaly(
+      anomaly1,
+      findAnomalyInRange(updated, { begin: 0, end: 10 })!
+    );
+    assert.equal(anomaly2[',key=1']![4].bug_id, 4101);
+    assert.equal(anomaly2[',key=2']![7].bug_id, 7102);
+  });
+});
+
 describe('mergeColumnHeaders', () => {
   it('merges simple case', () => {
     const a: ColumnHeader[] = [
diff --git a/perf/modules/dataframe/test_utils.ts b/perf/modules/dataframe/test_utils.ts
index 44efc34..ef3ad69 100644
--- a/perf/modules/dataframe/test_utils.ts
+++ b/perf/modules/dataframe/test_utils.ts
@@ -6,8 +6,10 @@
   Trace,
   TraceSet,
   FrameRequest,
+  AnomalyMap,
+  Anomaly,
 } from '../json';
-import { findSubDataframe, range } from './index';
+import { findAnomalyInRange, findSubDataframe, range } from './index';
 import { fromParamSet } from '../../../infra-sk/modules/query';
 
 // Generates an array where the values are repeated from the template.
@@ -116,9 +118,39 @@
   };
 };
 
+/**
+ * Generates the AnomalyMap from the DataFrame using simple input.
+ * @param df The dataframe used for the base commit and trace.
+ * @param data The list of anomaly data
+ * @returns The AnomalyMap
+ */
+export const generateAnomalyMap = (
+  df: DataFrame,
+  data: {
+    trace?: number; // The trace index in DataFrame
+    commit: number; // The commit offset from the first commit in DataFrame
+    bugId?: number; // The bugId that can be used to validate test.
+  }[]
+) => {
+  if ((df.header?.length || 0) <= 0) {
+    return {};
+  }
+  const firstCommit = df.header![0]!.offset;
+  const anomaly: AnomalyMap = {};
+  data.forEach((each) => {
+    const traceKey = `,key=${each.trace || 0}`;
+    anomaly[traceKey] = anomaly[traceKey] || {};
+    anomaly[traceKey]![firstCommit + each.commit] = {
+      bug_id: each.bugId || 0,
+    } as Anomaly;
+  });
+  return anomaly;
+};
+
 export const mockFrameStart = (
   df: DataFrame,
   paramset: ReadOnlyParamSet,
+  anomaly: AnomalyMap = null,
   delayInMS: number = 0,
   mock: FetchMockStatic = fetchMock
 ) => {
@@ -144,6 +176,10 @@
         messages: [{ key: 'Loading', value: 'Finished' }],
         results: {
           dataframe: generateSubDataframe(df, subrange),
+          anomalymap: findAnomalyInRange(anomaly, {
+            begin: df.header![subrange.begin]?.offset || 0,
+            end: df.header![subrange.end]?.offset || 0,
+          }),
         },
       };
     }
diff --git a/perf/modules/explore-simple-sk/explore-simple-sk.ts b/perf/modules/explore-simple-sk/explore-simple-sk.ts
index fd7f143..2907a78 100644
--- a/perf/modules/explore-simple-sk/explore-simple-sk.ts
+++ b/perf/modules/explore-simple-sk/explore-simple-sk.ts
@@ -2318,7 +2318,11 @@
         errorMessage('Failed to find any matching traces.');
         return;
       }
-      this.dfRepo?.resetWithDataframeAndRequest(json.dataframe!, body);
+      this.dfRepo?.resetWithDataframeAndRequest(
+        json.dataframe!,
+        json.anomalymap,
+        body
+      );
       // TODO(seanmccullough): Verify that the following removeAll() call isn't necessary:
       // this.plot!.removeAll();
       this.addTraces(json, switchToTab);
@@ -2390,7 +2394,11 @@
     const switchToTab =
       body.formulas!.length > 0 || body.queries!.length > 0 || body.keys !== '';
     this.requestFrame(body, (json) => {
-      this.dfRepo?.resetWithDataframeAndRequest(json.dataframe!, body);
+      this.dfRepo?.resetWithDataframeAndRequest(
+        json.dataframe!,
+        json.anomalymap,
+        body
+      );
       this.plot!.removeAll();
       this.addTraces(json, switchToTab);
     });
@@ -2771,7 +2779,11 @@
     this._stateHasChanged();
     const body = this.requestFrameBodyFullFromState();
     this.requestFrame(body, (json) => {
-      this.dfRepo?.resetWithDataframeAndRequest(json.dataframe!, body);
+      this.dfRepo?.resetWithDataframeAndRequest(
+        json.dataframe!,
+        json.anomalymap,
+        body
+      );
       this.addTraces(json, true);
     });
   }