import deepEqual from 'fast-deep-equal';
import Handlebars from 'handlebars';
import { v1 as uuidv1 } from 'uuid';

import { reThrow } from '../errors/errorLog';
import { getErrorString } from '../errors/errorString';
import { getLogger, Loggers, LogLevels } from '../loggerSupport';
import { FILTER_MODEL, NEXT_TOKEN } from '../metadataSupportConstants';
import { UseTestData } from '../schemaManager';
import { StageType } from '../types';
import { extractHandlebarsKeys, stringify } from '../utilityFunctions';

import { GraphQLExecutor } from './graphQLExecutor';
import {
  ICreatePipelineExecutorParams,
  IPipelineExternal,
  PipelineManager,
  SYSTEM_EXIT,
} from './pipelineManager';
import { StageImpl } from './stageImpl';

const logger = getLogger({ name: Loggers.PIPELINE_MANAGER });
const logBatching = getLogger({
  name: Loggers.PIPELINE_MANAGER_BATCHING,
  level: LogLevels.Warn,
});
const logRecordQuery = getLogger({
  name: Loggers.PIPELINE_MANAGER_RECORD_QUERY,
  level: LogLevels.Warn,
});

// These match the Kendo specification for filters: DataSourceParameterMapDataFilter,
// https://docs.telerik.com/kendo-ui/api/javascript/data/datasource/configuration/filter#filter
// We don't use those actual types here, but have our own (since universal code should not
// have any UI dependencies), but the types are completely compatible.
type Logic = 'and' | 'or';
type Operator =
  | 'eq'
  | 'neq'
  | 'isnull'
  | 'isnotnull'
  | 'lt'
  | 'lte'
  | 'gt'
  | 'gte'
  | 'startswith'
  | 'doesnotstartwith'
  | 'endswith'
  | 'doesnotendwith'
  | 'contains'
  | 'doesnotcontain'
  | 'isempty'
  | 'isnotempty';

export interface IPipelineFilter {
  field?: string;
  filters?: IPipelineFilter[];
  logic?: Logic;
  operator?: Operator;
  value?: any;
}

export interface ISortKey {
  field: string;
  descending?: boolean;
}

export interface IPage {
  // The nextToken value where this page begins
  [NEXT_TOKEN]: string;
  startingRowNumber?: number;
}

export class PipelineExecutor {
  public pipelineManager: PipelineManager;

  public params: ICreatePipelineExecutorParams;

  public stages: StageImpl[];
  public stageIndex: number;

  public closed: boolean;
  // If there is a callback, the pipeline can only be executed once, as the queries are kept open
  // in the callback case, so if you want to re-execute, you have to close the executor and create
  // another.
  private executed: boolean;

  // Immutable inputs of the pipeline
  public input: any;
  public inputGlobal: any;
  public inputWidget: any;

  // Cumulative execution results, the output of the pipeline
  public output: any;

  // Properties for each stage;
  public properties: any;

  public external: IPipelineExternal;

  // (K: stageName, V: state.workingProperties }
  public propertiesByStage: any;

  // Optionally specified by caller to be called when the lead query changes
  public callback: (executor: PipelineExecutor) => any;

  // FIXME - for Javascript the nested fields are handled by dot separators, like a.b.c, but for
  // GraphQL, each field is an array. See extractHandlebarsKeys. This should be normalized to the
  // dot separated string.
  public requestedFields: Array<string | string[]>;
  public requiredFields: Array<string | string[]>;

  public suppressPipelineDumpOnError: boolean;
  public errorLogger: (object) => any;

  // This pipeline will execute in paged mode (read only)
  public paged: boolean;
  private pagedStageIndex = 0;

  // Stack of next tokens read from this pipeline to support pageForward and pageBackward
  // operations (read only)
  private pages: IPage[];

  public noMorePages: boolean;
  // The ag-grid filter model
  private filterModel: any = {};

  public outstandingChunks: number;

  // Used only for paging support
  public filter: IPipelineFilter;

  public sortKeys: ISortKey[];
  public minimumPageSize: number;

  // Below are for internal use only

  public executing: boolean;
  public aborting: boolean;

  // Used to ignore async results that come in after the pipeline has been restarted
  public executionIncarnation = 0;

  public requestId: string;
  public executionId: string;

  public sequenceNumber: number;
  public tracingIdentifier: string;
  public category: string;

  // Run the pipeline after the current execution is finished, using the
  // starting stage and context below
  public executeAfterFinished: boolean;

  // Execute once all of the runners have finished.
  public concurrentInput: Record<string, any>;

  // If concurrent execution is detected, this is the context to be used to
  public concurrentStageIndex: number;

  public graphQLExecutor: GraphQLExecutor;

  public configName: string;
  public useTestData: UseTestData;
  public executionConfigName: string;

  public failedValidation: boolean;

  constructor(manager: PipelineManager, params: ICreatePipelineExecutorParams) {
    if (!manager) {
      throw new Error(
        `Pipeline may not be created without the PipelineManager: ${params.stages}`,
      );
    }
    this.params = params;
    this.pipelineManager = manager;
  }

  public init() {
    const { params, pipelineManager } = this;
    if (!params.stages) {
      throw new Error('No stages were specified');
    }
    this.stages = params.stages.map((s) => new StageImpl(s));
    this.callback = params.callback;
    this.errorLogger = params.errorLogger;
    this.category = params.category;
    this.requestId = params.requestId || uuidv1();
    this.executionId = params.executionId || uuidv1();
    this.configName =
      params.configName || pipelineManager.schemaManager.executionConfigName;
    this.useTestData =
      params.useTestData || pipelineManager.schemaManager.useTestData;
    this.executionConfigName =
      params.executionConfigName ||
      pipelineManager.schemaManager.executionConfigName;
    this.tracingIdentifier = params.tracingIdentifier;
    this.sequenceNumber = params.sequenceNumber;
    this.external = params.external;

    this.input = params.input || {};
    this.inputGlobal = params.inputGlobal;
    this.inputWidget = params.inputWidget;
    this.output = {};
    this.properties = {};

    const stageSet = new Set();
    let index = 0;
    this.graphQLExecutor = new GraphQLExecutor(this);

    const requestedFieldsMap = {};
    const requiredFieldsMap = {};
    for (const s of this.stages) {
      if (!s.originalProperties._name) {
        if (index === this.stages.length - 1) {
          s.originalProperties._name = 'data';
        } else {
          s.originalProperties._name = `stage${index}`;
        }
      }
      if (stageSet.has(s.originalProperties._name)) {
        throw new Error(
          `Stage name ${s.originalProperties._name} already exists in \n${this}`,
        );
      }

      s.stageIndex = index;

      s.workingProperties = Object.assign({}, s.originalProperties);
      if (!s.workingProperties._testContext) {
        s.workingProperties._testContext = this.params.testContext;
      }
      if (!s.workingProperties._requestId) {
        s.workingProperties._requestId = this.requestId;
      }
      if (!s.workingProperties._executionId) {
        s.workingProperties._executionId = this.executionId;
      }
      s.executor = this;
      stageSet.add(s.originalProperties._name);
      this.properties[s.originalProperties._name] = s.workingProperties;
      const si = this.pipelineManager.getStageInfo(
        s.originalProperties._stageType,
      );
      if (!si) {
        throw new Error(
          `Invalid execution type in ${s.originalProperties._stageType} in\n${this}`,
        );
      }

      // Figure out required things that affect the executor
      if (si.evaluator) {
        si.evaluator(this, s);
      }

      s.requestedFields.forEach((rf) => (requestedFieldsMap[rf] = rf));
      s.requiredFields.forEach((rf) => (requiredFieldsMap[rf] = rf));
      if (s.workingProperties._paged) {
        if (this.paged) {
          throw new Error(
            'Multiple paged queries in pipeline; only one is supported',
          );
        }
        this.paged = true;
        this.pagedStageIndex = s.stageIndex;
      }

      Object.values(s.originalProperties).forEach((p) => {
        if (typeof p === 'string') {
          const handleBarsKeys = extractHandlebarsKeys(p);
          // Each key is an array of the fields, like ['a', 'b'], so use the string value to check for dups
          handleBarsKeys.forEach((hk) => {
            requestedFieldsMap[hk.toString()] = hk;
            if (s.workingProperties._requireHandlebarsValues) {
              requiredFieldsMap[hk.toString()] = hk;
            }
          });
        }
      });

      index++;
    }

    this.requestedFields = Object.values(requestedFieldsMap);
    this.requiredFields = Object.values(requiredFieldsMap);
  }

  public getTracingInfo() {
    if (this.tracingIdentifier) {
      return `${this.tracingIdentifier}-${this.category} (${this.sequenceNumber})`;
    } else if (this.sequenceNumber) {
      return `${this.sequenceNumber}`;
    }
    return '<no tracingIdentifier>';
  }

  public async execute(input?: any, startingStageIndex?: number) {
    if (this.closed) {
      throw new Error(
        `Attempting to execute closed pipeline executor ${this.getTracingInfo()}`,
      );
    }
    if (this.external && !this.external.executor) {
      this.external.executor = this;
    }
    if (this.executed && this.callback && startingStageIndex === undefined) {
      throw new Error(
        `Attempting to execute a pipeline that has a callback more than once: ${this.getTracingInfo()}`,
      );
    }
    this.executed = true;
    this.pages = [];
    await this.executeInternal({ input, startingStageIndex });
  }

  public async executeInternal(params: {
    input?: any;
    startingStageIndex?: number;
  }) {
    const { input, startingStageIndex = 0 } = params;

    if (this.callback && this.executing) {
      // This happens only when the pipeline is processing a graphQL query callback, and
      // another one comes in from the network
      logger.debug(
        `CONCURRENT ATTEMPT pl: ${this.getTracingInfo()} ${
          this.sequenceNumber
        } ${startingStageIndex} concurrent index ${this.concurrentStageIndex}`,
      );
      // Start the the earliest stage when it's time
      if (
        !this.concurrentStageIndex ||
        this.concurrentStageIndex > startingStageIndex
      ) {
        this.concurrentStageIndex = startingStageIndex;
      }

      this.executeAfterFinished = true;
      this.concurrentInput = input;
      this.aborting = true;
      return;
    }

    if (input) {
      if (this.input) {
        Object.assign(this.input, input);
      } else {
        this.input = input;
      }
    }

    try {
      this.executing = true;
      this.executionIncarnation++;
      logger.debug(
        `Pipeline start ${this.getTracingInfo()} ${startingStageIndex} concurrent index ${
          this.concurrentStageIndex
        } executionInc: ${this.executionIncarnation}`,
      );
      let nextStage = null;

      this.stageIndex = startingStageIndex;
      if (this.stageIndex < this.stages.length) {
        while (true) {
          if (nextStage === SYSTEM_EXIT || this.aborting) {
            break;
          }
          if (nextStage) {
            this.stageIndex = this.stages.findIndex(
              (s) => s.originalProperties._name === nextStage,
            );
            if (this.stageIndex === -1) {
              throw new Error(`nextStage ${nextStage} not found`);
            }
          } else {
            if (this.stageIndex >= this.stages.length) {
              break;
            }
          }
          const stage = this.stages[this.stageIndex];
          logger.debug(
            `Pipeline stage start ${this.getTracingInfo()} ${
              stage.originalProperties._name
            }`,
          );
          nextStage = await this.executeStage(stage);
          logger.debug(
            `Pipeline stage end ${this.getTracingInfo()} ${
              stage.originalProperties._name
            }`,
          );
          this.stageIndex++;
        }
      }
      if (this.callback && !this.aborting) {
        const executeCallback = () => {
          try {
            if (!this.closed) {
              logBatching.debug(
                `Executing callback - ${this.getTracingInfo()}`,
              );
              this.callback(this);
            } else {
              logBatching.debug(
                `Callback attempted when executor closed - ignoring - ${this.getTracingInfo()}`,
              );
            }
          } catch (error) {
            reThrow({
              logger,
              error,
              message: `Error executing callback ${this.getTracingInfo()}`,
            });
          }
        };

        if (this.pipelineManager.graphQLManager.suppressCallbacks > 0) {
          logBatching.debug(`Callback suppressed - ${this.getTracingInfo()}`);
          this.pipelineManager.graphQLManager.addCallback(
            this.sequenceNumber,
            executeCallback,
          );
        } else {
          executeCallback();
        }
      }
    } finally {
      logger.debug(
        `Pipeline end ${this.getTracingInfo()} ${startingStageIndex} concurrent index ${
          this.concurrentStageIndex
        }`,
      );
      this.executing = false;
      this.aborting = false;
      if (this.executeAfterFinished) {
        logger.debug(
          `CONCURRENT END Executing ${this.getTracingInfo()} - stageIndex: ${
            this.concurrentStageIndex
          }`,
        );
        const contextToUse = this.concurrentInput;
        const stageIndexToUse = this.concurrentStageIndex;
        this.concurrentInput = null;
        this.concurrentStageIndex = 0;
        this.executeAfterFinished = false;
        return this.execute(contextToUse, stageIndexToUse);
      }
    }
  }

  private async executeStage(stage: StageImpl) {
    const input = {
      ...this.input,
      ...this.output,
      // FIXME - rename these
      system_uuid: uuidv1(),
      system_dateTimeNow: new Date().toISOString(),
    };

    const { originalProperties, workingProperties } = stage;

    Object.keys(originalProperties).forEach((k) => {
      if (typeof originalProperties[k] === 'string') {
        workingProperties[k] = Handlebars.compile(originalProperties[k])({
          ...input,
          ...workingProperties,
        });
      }
    });

    // FIXME - find a better way to handle these defaults
    if (workingProperties._throwOnError === undefined) {
      workingProperties._throwOnError = true;
    }

    const si = this.pipelineManager.getStageInfo(workingProperties._stageType);

    Object.keys(si.typeInfo.properties).forEach((k) => {
      if (si.typeInfo.properties[k].resetBeforeExecution) {
        workingProperties[k] = originalProperties[k];
      }
      // Allow the properties to be populated from the input or output
      if (!workingProperties[k]) {
        const checkKey = `_${k}`;
        if (this.input && this.input[checkKey]) {
          workingProperties[k] = this.input[checkKey];
        }
        if (this.output && this.output[checkKey]) {
          workingProperties[k] = this.output[checkKey];
        }
      }
    });

    const stageExecutor =
      this.pipelineManager.stageInfos[originalProperties._stageType]?.executor;
    if (stageExecutor) {
      try {
        await stageExecutor(this, stage);
      } catch (error) {
        const pipelineID = `Pipeline: ${this.getTracingInfo()}\nStage: ${stage}`;

        let pipelineDump = '';
        if (!this.suppressPipelineDumpOnError) {
          pipelineDump = `:\nPipeline Input ${stringify({
            input: this.input,
            noThrow: true,
          })}`;
          pipelineDump += `:\nPipeline Output ${stringify({
            input: this.output,
            noThrow: true,
          })}`;
        }

        if (this.errorLogger) {
          this.errorLogger({ error, context: pipelineDump });
        }

        error.pipeline = pipelineID;
        // This is too much
        // error.pipelineDump = pipelineDump;
        workingProperties._error = error;
        if (workingProperties._throwOnError) {
          reThrow({
            logger,
            error,
            message: 'Problem executing pipeline stage',
          });
        }
        logger.error(getErrorString(error));
      }
    } else {
      throw new Error(
        `No executor found for: ${stage.originalProperties._stageType}`,
      );
    }

    if (workingProperties._exitPipeline) {
      return SYSTEM_EXIT;
    }

    if (workingProperties._nextStage) {
      return workingProperties._nextStage;
    }

    if (workingProperties._forEachArray) {
      const followingStages: StageImpl[] = [];
      for (let i = this.stageIndex + 1; i < this.stages.length; i++) {
        if (this.stages[i].workingProperties._forEachConsume) {
          followingStages.push(this.stages[i]);
        } else {
          break;
        }
      }

      if (followingStages.length === 0) {
        throw new Error(
          `_forEachArray was specified, but the following stage does not have _forEachItem: ${stage.originalProperties._name}`,
        );
      }

      for (const item of workingProperties._forEachArray) {
        for (const followingStage of followingStages) {
          followingStage.workingProperties._forEachItem = item;
          await this.executeStage(followingStage);
        }
      }

      const stageAfterIndex = this.stageIndex + 1 + followingStages.length;
      if (stageAfterIndex < this.stages.length) {
        return this.stages[stageAfterIndex].originalProperties._name;
      }
      return SYSTEM_EXIT;
    }

    // Go to the normal next stage
    return null;
  }

  // Used by stages that support paging
  public getNextPage(): IPage {
    return this.pages[this.pages.length - 1];
  }

  public pushNextPage(nextPage: IPage) {
    this.pages.push(nextPage);
  }

  // Methods below used by consumers of the pipeline

  public resetPaging() {
    this.pages = [];
    this.noMorePages = false;
  }

  public hasPageForward(): boolean {
    // The null token is pushed at the end
    return (
      !this.noMorePages &&
      (this.pages.length === 0 || this.pages[this.pages.length - 1] !== null)
    );
  }

  public hasPageBackward(): boolean {
    return this.pages.length > 1;
  }

  public setFilterModel(filterModel: any) {
    if (deepEqual(this.filterModel, filterModel)) {
      return;
    }
    this.filterModel = filterModel;

    for (const stage of this.stages) {
      // Only one _paged stage is allowed
      if (
        stage.workingProperties._stageType === StageType.graphQLQuery &&
        stage.workingProperties._paged
      ) {
        if (!stage.workingProperties.variables) {
          stage.workingProperties.variables = {};
        }
        stage.workingProperties.variables[FILTER_MODEL] = filterModel;
      }
    }

    this.resetPaging();

    // Rerun async, will provide the results
    void this.executeInternal({});

    logger.debug(filterModel, 'pipelineExecutor - changing filterModel');
  }

  // Returns true if there are more pages
  public async pageForward(): Promise<boolean> {
    if (!this.hasPageForward()) {
      throw new Error('Attempt to pageForward after the end was reached');
    }
    const pageCount = this.pages.length;
    await this.executeInternal({ startingStageIndex: this.pagedStageIndex });
    if (this.pages.length === pageCount) {
      this.noMorePages = true;
    }
    return this.hasPageForward();
  }

  // Return true if there are more pages
  public async pageBackward(): Promise<boolean> {
    if (!this.hasPageBackward()) {
      throw new Error(
        'Attempt to pageBackward after the beginning was reached',
      );
    }
    this.pages.pop();
    await this.executeInternal({ startingStageIndex: this.pagedStageIndex });
    this.pages.pop();
    return this.hasPageBackward();
  }

  // Return graphql mutation on the object of the given type
  public makeGraphqlObjectMutation({
    obj,
    typeName,
    fieldsToOmit,
  }: {
    obj: Record<string, any>;
    typeName: string;
    fieldsToOmit?: string[];
  }): string {
    return this.pipelineManager.metadataSupport.makeGraphqlObjectMutation(
      obj,
      typeName,
      this.configName,
      fieldsToOmit,
    );
  }

  public convertValue(kind, value, name, variables) {
    let localObjValue;
    switch (kind) {
      case 'IntValue':
        localObjValue = parseInt(value.value, 10);
        break;
      case 'FloatValue':
        localObjValue = parseFloat(value.value);
        break;
      case 'StringValue':
      case 'EnumValue':
      case 'BooleanValue':
        localObjValue = value.value;
        break;
      case 'NullValue':
        localObjValue = null;
        break;
      case 'Variable':
        localObjValue = variables[value.name.value];
        break;
      case 'ObjectValue': {
        localObjValue = {};
        value.fields.forEach((field) => {
          if (localObjValue[field.name.value]) {
            throw new Error(
              `Field: ${field.name.value} already exists in ${stringify({
                input: localObjValue,
                noThrow: true,
              })}`,
            );
          }
          localObjValue[field.name.value] = this.convertValue(
            field.value.kind,
            field.value,
            field.name.value,
            variables,
          );
        });
        break;
      }
      case 'ListValue': {
        localObjValue = [];
        value.values.forEach((entry) => {
          localObjValue.push(
            this.convertValue(entry.kind, entry, name, variables),
          );
        });
        break;
      }
      default:
        throw new Error(
          `Unsupported graphQL mutation value: '${kind}' ` +
            `for field: ${stringify({
              input: name,
              noThrow: true,
            })} (the value might be too complex), ` +
            `\nvalue: ${stringify({ input: value, noThrow: true })}`,
        );
    }
    return localObjValue;
  }

  public convertNestedObject(argument, variables): Record<string, any>[] {
    // fieldUsed.nested = {};
    const obj = this.convertValue(
      argument.value.kind,
      argument.value,
      argument.name.value,
      variables,
    );
    if (Array.isArray(obj)) {
      return obj;
    }
    return [obj];
  }

  public recordResult(stage: StageImpl, result: any) {
    // Used to make sure we know where this result comes from
    // For the map query, the __resultId will interfere with the results
    if (
      logRecordQuery.isLevelEnabled(LogLevels.Info) &&
      typeof result === 'object' &&
      (stage.workingProperties._stageType !== StageType.graphQLQuery ||
        !stage.workingProperties.outputAsMap)
    ) {
      const resultId =
        this.getTracingInfo() + ' ' + stage.workingProperties._name;
      if (result) {
        result.__resultId = resultId;
        if (Array.isArray(result)) {
          result.forEach((e) => {
            if (typeof e === 'object') {
              e.__resultId = resultId;
            }
          });
        }
      }
    }
    stage.result = result;
    logger.trace(
      stage.result,
      `Record result: ${stage.originalProperties._name}/${
        StageType[stage.originalProperties._stageType]
      }:`,
    );
    this.output[stage.originalProperties._name] = stage.result;
  }

  public close() {
    logger.debug(`Pipeline close - ${this.getTracingInfo()}`);
    this.closed = true;

    this.stages.forEach((stage) => {
      const si = this.pipelineManager.getStageInfo(
        stage.originalProperties._stageType,
      );
      if (si.close) {
        si.close(this, stage);
      }
    });

    this.pipelineManager.close(this);
  }

  public toString() {
    return `Pipeline: ${this.getTracingInfo()}\nstages: \n${this.stages}`;
  }
}
