import { ApolloClient, NormalizedCacheObject } from '@apollo/client/core';
import {
  InvokeWithResponseStreamCommandInput,
  InvokeWithResponseStreamCommandOutput,
} from '@aws-sdk/client-lambda';
import { S3 } from '@aws-sdk/client-s3';
import { SQS } from '@aws-sdk/client-sqs';
import { isBrowser } from 'browser-or-node';
import ExcelJs from 'exceljs/dist/exceljs.min.js';
import {
  BooleanValueNode,
  OperationDefinitionNode,
  StringValueNode,
  VariableNode,
  valueFromASTUntyped,
} from 'graphql';
import gql from 'graphql-tag';
import _ from 'lodash';
import PQueue from 'p-queue';
import safeJsonStringify from 'safe-json-stringify';
import { v1 as uuidv1 } from 'uuid';

import { FETCH_POLICY } from '../apolloClient/graphQlClient';
import { ClientManager } from '../clientManager';
import { HEADER_REMOTE_CONTEXT, SYSTEM } from '../common/commonConstants';
import { exists, getLambdaFunctionFullName } from '../common/commonUtilities';
import { reThrow } from '../errors/errorLog';
import { getErrorString } from '../errors/errorString';
import { RetryFunction, retry } from '../errors/retry';
import { LambdaSupport, PIPELINE_HANDLER_FUNCTION } from '../lambdaSupport';
import { Loggers, getLogger } from '../loggerSupport';
import { MetadataSupport } from '../metadataSupport';
import {
  AGGREGATE_PIPELINE_OUTPUT,
  APP_DEF_PIPELINE,
  CHUNK_ID,
  CONFIG_SEPARATOR,
  CONNECTION_COUNT,
  CREATION_TIME,
  DEACTIVATION_DATE,
  DIRECTIVE_ACTIVEASOF,
  DIRECTIVE_IGNORE_SIZECLASS,
  DIRECTIVE_OUTPUTPIPELINE,
  DIRECTIVE_OUTPUTPIPELINE_AGGREGATE_PIPELINE,
  DIRECTIVE_OUTPUTPIPELINE_ARGUMENTS,
  DIRECTIVE_OUTPUTPIPELINE_LOGEXECUTION,
  DIRECTIVE_OUTPUTPIPELINE_LOGOUTPUT,
  DIRECTIVE_OUTPUTPIPELINE_NAME,
  FETCH_POLICY_NO_CACHE,
  IEntityType,
  ITEMS,
  NEXT_TOKEN,
  PAGE_NUMBER,
  PIPELINE_EXECUTION,
  QualifiedName,
  SCANNED_COUNT,
  TYPE_NAME,
  UPDATE_TIME,
} from '../metadataSupportConstants';
import { IMetadataRequest, SchemaManager, UseTestData } from '../schemaManager';
import {
  ITestContext,
  TestWhatsHappening,
  handleTestRequest,
} from '../testSupport';
import { TypeDefinition } from '../typeDefinition';
import { SimpleType, StageType } from '../types';
import {
  getLocalIsoDate,
  getLocalIsoTimestamp,
  isNullOrUndefined,
  performanceTimes,
  sleep,
  stringify,
  stringifyPretty,
  waitFor,
} from '../utilityFunctions';

import { ConvertType, GraphQLManager } from './graphQLManager';
import { getGraphqlQueryFieldName } from './graphQLSupport';
import {
  getFieldNameInfo,
  getTopLevelDirective,
  makeGraphQLArgumentsString,
} from './graphQLSupportAst';
import { PipelineExecutor } from './pipelineExecutor';
import * as statusAnalysis from './pipelineStatusAnalysis';
import {
  getCountSummary,
  getRemoteAsyncPipelineStatusTree,
  printCountSummary,
  printRemoteAsyncPipelineStatusTree,
} from './pipelineStatusAnalysis';
import {
  IStageExecutor,
  IStageInfo,
  IStageInfos,
  IStageProperties,
} from './stage';
import { StageImpl } from './stageImpl';
import { initialize as emailInit } from './stageTypes/email';
import { initialize as execPipelineInit } from './stageTypes/execPipeline';
import {
  IStagePropertiesGraphQLMutation,
  initialize as graphQLMutationInit,
} from './stageTypes/graphQLMutation';
import { initialize as graphQLQueryInit } from './stageTypes/graphQLQuery';
import { initialize as httpInit } from './stageTypes/http';
import { initialize as javascriptInit } from './stageTypes/javascript';
import { initialize as lambdaInit } from './stageTypes/lambda';
import { initialize as s3Init } from './stageTypes/s3';
import { stageInfos } from './stageTypes/stageInfos';
import { initialize as typescriptInit } from './stageTypes/typescript';

// import LZString from 'lz-string';

const logger = getLogger({ name: Loggers.PIPELINE_MANAGER });

export const HTTP_METHODS = ['GET', 'POST', 'PUT', 'DELETE'];

// Internal use only, this is only used to get up the GlobalConfig
export const CONCURRENCY_UPDATE_INTERVAL_DEFAULT = 2 * 60 * 1000;
export const CONCURRENCY_UPDATE_INCREMENT_DEFAULT = -2;

// Values to reduce the remaining function times for in timeout situations
// We need to account for the start time of a cold-started lambda function here
export const TIMEOUT_BUFFER_MUTATION = 15000;
export const TIMEOUT_BUFFER_FUNCTION = 13000;

// Whenever there is a timeout, this is the amount to increase by
export const TIMEOUT_ADJUSTMENT_SECONDS = 10;

// Amount of time to wait for an SQS message to be sent
export const TIMEOUT_ASYNC_SEND = 5000;

export const SYSTEM_EXIT = 'system_exit';

export const STACK_INSTALL_PIPELINE = '_stackInstall';
export const STACK_UNINSTALL_PIPELINE = '_stackUninstall';

export const EXECUTION_ID = 'executionId';

// Default is something that can work for the browser
const BATCH_MUTATE_CONCURRENCY_BROWSER = 5;
const REPLACE_DIRECTIVE_DEFAULT = '@replace(path: "/*")';
//
// WARNING - when IRemoteContext, IPipelineRemoteParams, or IPipelineRemoteParamsAsync is changed, the PipelineManager.getRemoteArgs()
// needs to be changed to align with this. All of the fields in these interfaces must be conveyed
// using getRemoteArgs()
//

// An object that is sent with all remote communications
export interface IRemoteContext {
  clientId?: string;

  // WARNING - update getRemoteArgs and/or getGraphQLRequestContext (GraphQL operations) when changing this

  // This is generated at the root of a request, or propagated if the request
  // continues (graphQL query and then pipelines executed as a result of that
  // for example)
  requestId?: string;

  // For a specific execution of a pipeline/graphql (possibly within a request)
  executionId?: string;

  // For batch mutation operations
  batchId?: string;

  // Used to resolve any unqualified name references in the execution of the pipeline
  configName?: string;

  // Specifies the execution configuration for the pipeline. Provided only in cases
  // where the execution configuration is not set in the schema manager (for example
  // in the apollohandler).
  executionConfigName?: string;

  // Provided only in cases where the useTestData is not set in the schema manager
  // (for example in the apolloHandler)
  useTestData?: UseTestData;

  // Sequence number fo this part of a chunk
  pageNumber?: number;

  // For a GraphQLQuery, corresponds to the outputAsMap property
  outputAsMap?: boolean;

  // For a GraphQLQuery, corresponds to the orderBy property
  orderBy?: string;

  // For a GraphQLQuery, corresponds to the ascDesc property
  ascDesc?: string;

  // The server may react differently depending on the fetch policy
  fetchPolicy?: string;

  // Corresponds to the getting of this property in the caller
  enableNetworkSubscriptions?: boolean;

  testContext?: ITestContext;

  // The execution id of the pipeline which called this one
  callingExecutionId?: string;

  // Execute the query locally if this is running in the apolloHandler
  useLocalLink?: boolean;

  // Current time for the client
  clientLocalTimestamp?: string;

  // Requesting userId, this is used in server-server communications to allow this request on behalf
  // of any user id (for example the REST API handler). This userId can only be trusted if the
  // idToken is the trusted devops user.
  requestingUserId?: string;
}

// This should match TypeDefinition/PipelineExecutionType.yml
export enum PipelineExecutionType {
  TRIGGER_NEXT_PAGE = 'TRIGGER_NEXT_PAGE',
  COMPLETE_MUTATION_AFTER_TIMEOUT = 'COMPLETE_MUTATION_AFTER_TIMEOUT',
  TO_DEAD_LETTER_QUEUE = 'TO_DEAD_LETTER_QUEUE',
  REGULAR_PIPELINE = 'REGULAR_PIPELINE',
  REMOTE_MUTATION = 'REMOTE_MUTATION',
  UPDATE_EXECUTION_LOG = 'UPDATE_EXECUTION_LOG',
}

interface IPipelineRemoteExecutionLoggingParams {
  // WARNING - update getRemoteArgs when changing this
  pipelineExecutionType?: PipelineExecutionType;

  // Logs all output in _PipelineExecution.output
  // (turn this on only if the output is not too big, or has spaces in it,
  // otherwise there will be problems)
  logOutput?: boolean;

  // Indicates a record of this pipeline execution is kept. This is required when
  // you want to wait for the execution of the pipeline, or use an output pipeline
  // with aggregation
  doRemoteAsyncLogging?: boolean;
}

// Internal use only, fields that need to be conveyed for remote execution
export interface IPipelineRemoteParams
  extends IRemoteContext,
    IPipelineRemoteExecutionLoggingParams {
  // WARNING - update getRemoteArgs when changing this
  name?: QualifiedName;
  stages?: IStageProperties[];
  input?: Record<string, any>;
}

// Internal use only, between the pipelineHandler and its caller
export interface IPipelineRemoteResponse {
  response?: any;
  // stringified error
  error?: string;
  errorStatusCode?: number;
}

export interface IPipelineRemoteAsyncMessageAttribute {
  DataType: string;
  // Used for the SQS params sending the message
  StringValue?: string;
  // Used when reading the message
  stringValue?: string;
}

export interface IPipelineRemoteAsyncMessageAttributes {
  S3UUID: IPipelineRemoteAsyncMessageAttribute;
  ExecutionId: IPipelineRemoteAsyncMessageAttribute;
  RequestId: IPipelineRemoteAsyncMessageAttribute;
  ConfigName: IPipelineRemoteAsyncMessageAttribute;
  Payload: IPipelineRemoteAsyncMessageAttribute;
}

export enum ExecutionStatus {
  START,
  ACTIVE,
  // Order matters, anything below here is FINISHED
  FINISHED,
  MUTATION_TIMEOUT,
  TIMEOUT,
}

interface IExecutionInfo extends IExecutePipelineRemoteAsyncParams {
  executionStatus: ExecutionStatus;
  receiptHandle?: string;
  mutationTimedOut?: boolean;
  mutationTimedOutRetryExecutionId?: string;
  sqsRecord?: any;
  sentTimeMs?: number;
  startTimeMs?: number;
  s3Payload?: boolean;
  logId: string;
}

export interface IExecutionInfoMap {
  [executionId: string]: IExecutionInfo;
}

// Used for all servers (pipeline handler, pipeline handler queued, apollo handler)
export interface IServerExecutionContext {
  remoteContext?: IRemoteContext;

  // Separate from the remoteContext because sometimes the testContext
  // is needed when the remoteContext is not available (like in piplineHandlerQueued)
  testContext?: ITestContext;

  // Overall timeout value of the function
  timeoutMs?: number;

  // Start time of the invocation of the function, used to compute the remaining time
  functionStartTimeMs?: number;

  // Used to get the (modified to be unique) callingExecutionId in a mutation
  // timeout completion.
  executionInfoMap?: IExecutionInfoMap;
}

export interface IPipelineExternal {
  write: (path, obj) => void;
  writeSelf: (path, obj) => void;
  writeGlobal?: (path, obj) => void;
  writeParent?: (widgetName, path, data) => void;
  append: (path, obj) => void;
  appendSelf: (path, obj) => void;
  appendGlobal?: (path, obj) => void;
  appendParent?: (widgetName, path, data) => void;
  executor?: PipelineExecutor;
}

// Don't use this directly, defined as a class so we can get a list of the fields
// on the interface.
class CPipelineExecutionSummary {
  public recordCount?: number = 0;
  public nextToken?: any = '';
  public chunkId?: number = 0;
  public pageNumber?: number = 0;
  public waitNotifyToken?: string;
}

// Use this interface, not the class above
// tslint:disable-next-line:no-empty-interface
export type IPipelineExecutionSummary = CPipelineExecutionSummary;

// This is used in the pipeline execution logs written in the database
export const getPipelineExecutionSummaryKeys = () =>
  Object.keys(new CPipelineExecutionSummary());

// Aligned with the typedef _PipelineExecution
// This is the database record that records the execution of pipelines
export interface IPipelineExecution {
  // Execution Id
  id: string;
  executionId?: string;
  requestId?: string;
  pipelineName?: string;
  pipelineExecutionType?: PipelineExecutionType;
  // ISO 8601 dates
  sentTime?: string;
  startTime?: string;
  endTime?: string;
  elapsedFromSentTime?: number;
  executionTime?: number;
  errorString?: string;
  sqsTryCount?: number;
  timedOut?: boolean;
  mutationTimedOut?: boolean;
  timedOutRetryExecutionId?: string;
  mutationTimedOutRetryExecutionId?: string;
  outputSummary?: IPipelineExecutionSummary;
  inputSummary?: IPipelineExecutionSummary;
  output?: any;
  sqsRecord?: any;
  chunkId?: number;
  pageNumber?: number;
  inProgress?: string;
  callingExecutionId?: string;
}

// WARNING - Must be aligned with code in loadgraphql, defining the "ConnectionType"
export interface IQueryConnectionResult {
  [ITEMS]?: any[];
  [NEXT_TOKEN]?: string;
  [CONNECTION_COUNT]?: number;
  requestId?: string;
  executionId?: string;
  outputPipelineExecutionId?: string;
  [CHUNK_ID]?: number;
  [PAGE_NUMBER]?: number;
  // Number of records scanned for this query
  [SCANNED_COUNT]?: number;
  [AGGREGATE_PIPELINE_OUTPUT]?: any;
  [TYPE_NAME]?: string;
}

// WARNING - must match above (should generate this with fancy Typescript stuff)
export const QueryConnectionResultFields = {
  items: true,
  [NEXT_TOKEN]: true,
  [CONNECTION_COUNT]: true,
  requestId: true,
  executionId: true,
  outputPipelineExecutionId: true,
  [CHUNK_ID]: true,
  [PAGE_NUMBER]: true,
  [SCANNED_COUNT]: true,
  [AGGREGATE_PIPELINE_OUTPUT]: true,
};

export interface IMutationResult {
  result?: any;
  requestId?: string;
  remoteAsyncPipelineExecutionId?: string;
  retryPipelineExecutionId?: string;
}

export interface IExecuteQueryMutationParams extends IRemoteContext {
  input?: any;
  variables?: any;
  fetchPolicy?: string;
  noCache?: boolean;
}

export interface IExecuteQueryParams extends IExecuteQueryMutationParams {
  query: string;
  outputAsMap?: boolean;
  orderBy?: string;
  ascDesc?: string;
  returnConnectionResult?: boolean;
  pageCallback?:
    | ((records: any[]) => Promise<void>)
    | ((records: any[]) => void);
  callback?: (executor: PipelineExecutor) => any;
}

export interface IExecuteMutationParams extends IExecuteQueryMutationParams {
  mutation: string;
  remoteAsync?: boolean;
  remoteAsyncLoggingParams?: IPipelineRemoteExecutionLoggingParams;
  pipelineExecutor?: PipelineExecutor;
}

export interface ICreatePipelineExecutorParams extends IPipelineRemoteParams {
  inputGlobal?: Record<string, any>;
  inputWidget?: Record<string, any>;
  category?: string;
  errorLogger?: (object) => any;
  tracingIdentifier?: string;
  sequenceNumber?: number;
  external?: IPipelineExternal;

  // Used when data associated with a query has changed, the provided executor will have
  // been executed and have the new data
  callback?: (executor: PipelineExecutor) => any;
}

export interface IExecutePipelineRemoteAsyncParams
  extends ICreatePipelineExecutorParams,
    IPipelineRemoteParams {
  // The part of the queue name after the BASE_QUEUE_NAME
  queueName?: string;

  // The full queue name, used to reference the dead letter queue
  actualQueueName?: string;

  // If true, resolve the promise once the pipeline is finished, returns
  // an IPipelineExecution object describing the execution
  waitForCompletion?: boolean;

  // For internal use only, used if a message timed out, but is in S3, we want to
  // use this S3 uuid
  s3UUID?: string;
}

export interface IExecutePipelineRemoteParams
  extends ICreatePipelineExecutorParams {
  retryFunction?: RetryFunction;
  suppressErrorLogging?: boolean;
}

export interface IOutputPipelineInfo {
  hasOutputPipeline: boolean;
  outputPipeline?: string;
  logOutput?: boolean;
  logExecution?: boolean;
  outputPipelineArgs?: any[];
  aggregatePipeline?: string;
}

// FIXME - this needs to go somewhere better
export function makeExecutionIdUnique({
  executionId,
  sqsTryCount,
}: {
  executionId: string;
  sqsTryCount: number;
}) {
  return `${executionId}_${sqsTryCount}`;
}

export class PipelineManager {
  public clientManager: ClientManager;
  public schemaManager: SchemaManager;
  public metadataSupport: MetadataSupport;

  public apolloClient: ApolloClient<NormalizedCacheObject>;
  public sqsClient: SQS;
  public s3Client: S3;

  // Library functions for Javascript stages
  public javascriptStageLibraries: any;

  // User defined functions for Javascript stages
  public javascriptStageFunctions: any;

  public stageInfos: IStageInfos = stageInfos;

  public pipelineIdCounter = 0;
  public suppressPipelinesOnMutations: boolean | string[];
  public openPipelineCount: Record<string, any>;

  public fetchPolicy: string = FETCH_POLICY;

  public suppressPipelineDumpOnError: boolean;

  // Override searching for config objects for testing purposes
  public localSourceRoot: string;

  public executors: Set<PipelineExecutor> = new Set<PipelineExecutor>();

  public graphQLManager: GraphQLManager;

  // Only the browser clients generally care about this, sometimes tests need this, but they
  // can ask for it on demand. This is used to set this property in the remote context
  // which controls publishing notifications on a mutation. It also controls listening for
  // remote notifications by subscribing.
  public enableNetworkSubscriptions = isBrowser;

  // Used only for the tests to cause a delay before query refresh to account for
  // the Dynamodb eventual read consistency
  public waitForDynamoConsistency: boolean;

  public batchMutationConcurrencyOverride: number;

  public nextQueryId = 1;

  // Used to implement the @suppresslocalnotify directive and avoid re-triggering a potentially
  // large query that's unnecessary because the only purpose of the query is to will in an object
  // (like a grid) that has already reflected the change
  public processingSuppressLocalNotifyCount = 0;

  // Used with the above to indicate the current highest queryId. this handles a race condition where
  // a new query may be started in the middle of a @suppresslocalnotify mutation. In this case,
  // we want the new query to proceed.
  public processingSuppressLocalNotifyQueryId: number;

  public refreshCallback: any;

  private pipelineCache: any;

  public statusAnalysis = statusAnalysis;

  // Used to allow requests on behalf of this userId for access control purposes
  public requestingUserId: string;

  // For use with the @outputpipeline directive, this field is an IQueryConnectionResult on the input
  public QUERY_RESULT = '_queryResult';

  public CONSOLIDATED_OUTPUTS = '_consolidatedOutputs';
  public AGGREGATE_PIPELINE_OUTPUT = '_aggregatePipelineOutput';

  constructor() {
    this.openPipelineCount = {};
    this.pipelineCache = {};

    this.javascriptStageLibraries = { uuid: uuidv1, ExcelJs };
    this.javascriptStageFunctions = {};
  }

  public initialize(clientManager: ClientManager) {
    this.clientManager = clientManager;
    this.apolloClient = clientManager.apolloClient;
    this.sqsClient = clientManager.sqsClient;

    this.schemaManager = clientManager.schemaManager;
    this.metadataSupport = this.schemaManager.metadataSupport;
    this.graphQLManager = new GraphQLManager(this);

    // Don't use dynamic imports to do this because that's too slow
    emailInit(this.stageInfos[StageType.email]);
    execPipelineInit(this.stageInfos[StageType.execPipeline]);
    graphQLMutationInit(this.stageInfos[StageType.graphQLMutation]);
    graphQLQueryInit(this.stageInfos[StageType.graphQLQuery]);
    httpInit(this.stageInfos[StageType.http]);
    javascriptInit(this.stageInfos[StageType.javaScript]);
    lambdaInit(this.stageInfos[StageType.lambda]);
    s3Init(this.stageInfos[StageType.s3]);
    typescriptInit(this.stageInfos[StageType.typeScript]);

    Object.keys(this.stageInfos).forEach((sik) => {
      const stageInfo = this.stageInfos[sik];
      if (!stageInfo.executor) {
        throw new Error(
          `No executor found for stage: ${stageInfo.typeInfo.moduleName}`,
        );
      }
    });
  }

  public getPipelineAsyncS3Info(params: { uuid?: string }) {
    return this.clientManager.s3Support.getS3BucketAndKey({
      purpose: 'asyncpipelinedata',
      id: params.uuid,
    });
  }

  public getStageExecutor(stageType: StageType): IStageExecutor {
    return this.getStageInfo(stageType).executor;
  }

  public getStageInfo(stageType: StageType): IStageInfo {
    const info = this.stageInfos[stageType];
    if (!info) {
      throw new Error(`Stage info for stage type: ${stageType} not found`);
    }
    return info;
  }

  public setRefreshCallback(cb) {
    this.refreshCallback = cb;
  }

  public refreshGui() {
    if (this.refreshCallback) {
      this.refreshCallback();
    }
  }

  public getBatchMutateConcurrencyLevel() {
    if (this.batchMutationConcurrencyOverride) {
      return this.batchMutationConcurrencyOverride;
    }
    if (isBrowser) {
      return BATCH_MUTATE_CONCURRENCY_BROWSER;
    }
    return LambdaSupport.isInAwsRealLambdaServer() ? 30 : 10;
  }

  public static remoteIdString(params: {
    requestId?: string;
    executionId?: string;
    remoteContext?: IRemoteContext;
  }): string {
    const { requestId, executionId } = params.remoteContext
      ? params.remoteContext
      : params;
    const batchString = params.remoteContext?.batchId
      ? ` batch: ${params.remoteContext.batchId}`
      : '';
    if (executionId === requestId) {
      return `exe&req: ${executionId}${batchString}`;
    }
    return `exe/req: ${executionId} / ${requestId}${batchString}`;
  }

  public static remoteIdObject(remoteContext: IPipelineRemoteParams): {
    name?: string;
    requestId?: string;
    executionId?: string;
    batchId?: string;
  } {
    const { name, requestId, executionId, batchId } = remoteContext;
    return { name, requestId, executionId, batchId };
  }

  public async createNamedPipeline({
    name,
    stages,
    skipLambdaRestart = false,
  }: {
    name: string;
    stages: IStageProperties[];
    skipLambdaRestart?: boolean;
  }) {
    try {
      // Throws if this is not a qualified name
      MetadataSupport.getQualifiedName(name, null);
      await this.executeGraphqlMutation({
        mutation: `mutation cnp($stages: [Object]) @overwrite { create${APP_DEF_PIPELINE}Internal(input: { id:"${name}" stages: $stages configName:"${
          name.split(CONFIG_SEPARATOR)[0]
        }"} ) { id } }`,
        variables: { stages },
      });

      // FUCK dynamo
      await sleep(5000);

      // Invalidate the cache and the other servers
      this.pipelineCache = {};
      if (!skipLambdaRestart) {
        await this.clientManager.lambdaSupport.restartLambdaFunctions();
      }
    } catch (error) {
      throw new Error(
        `Failed to create pipeline: ${name}: ${getErrorString(error)}`,
      );
    }
  }

  public async deleteNamedPipeline({ name }: { name: string }) {
    try {
      await this.executeGraphqlMutation({
        mutation: `mutation { delete${APP_DEF_PIPELINE}Internal(input: { id:"${name}" } ) { id } }`,
      });
      // Invalidate the cache
      this.pipelineCache = {};
    } catch (error) {
      throw new Error(
        `Failed to delete pipeline: ${name}: ${getErrorString(error)}`,
      );
    }
  }

  public countPipelineOpen(category: string) {
    if (!this.openPipelineCount[category]) {
      this.openPipelineCount[category] = 0;
    }
    this.openPipelineCount[category]++;
  }

  public countPipelineClose(category: string) {
    if (!this.openPipelineCount[category]) {
      throw new Error(
        'trying to count a closed pipeline that was not counted when it opened',
      );
    }
    this.openPipelineCount[category]--;
  }

  public isSuppressingLocalNotifyQueries(queryId: number): boolean {
    return (
      this.processingSuppressLocalNotifyCount > 0 &&
      queryId < this.processingSuppressLocalNotifyQueryId
    );
  }

  public createPipelineExecutor(
    params: ICreatePipelineExecutorParams,
  ): PipelineExecutor {
    const pe = new PipelineExecutor(this, {
      ...params,
      sequenceNumber: ++this.pipelineIdCounter,
    });
    pe.init();
    pe.suppressPipelineDumpOnError = this.suppressPipelineDumpOnError;
    this.executors.add(pe);
    this.countPipelineOpen(params.category);
    return pe;
  }

  public async createPipelineExecutorAndExecute(
    params: ICreatePipelineExecutorParams,
  ): Promise<PipelineExecutor> {
    const pe = this.createPipelineExecutor(params);
    await pe.execute();
    return pe;
  }

  // This is intended only to be used to allow our GraphQL implementation to be plugged
  // into other clients like GraphICal.
  public async executeGraphqlWithParams(params: any) {
    const gqlQuery = gql(params.query);
    const opDef = gqlQuery.definitions[0] as OperationDefinitionNode;
    if (opDef.operation === 'mutation') {
      const { result } = await this.executeGraphqlMutation({
        mutation: params.query,
        variables: params.variables,
      });

      return result;
    }
    const result = await this.executeGraphqlQuery({
      query: params.query,
      returnConnectionResult: true,
    });
    return result;
  }

  // Used to execute a graphql query using the pipeline mechanism
  public async executeGraphqlQuery(
    params: IExecuteQueryParams,
  ): Promise<IQueryConnectionResult | any> {
    const {
      query,
      input,
      useTestData,
      configName,
      executionConfigName,
      testContext,
      pageCallback,
      returnConnectionResult,
      callback,
    } = params;
    const stageName = this.calculateStageName(query, input);

    const pe = this.createPipelineExecutor({
      stages: [
        {
          _stageType: StageType.graphQLQuery,
          _name: stageName,
          ...params,
        },
      ],
      input,
      useTestData,
      executionConfigName,
      configName,
      testContext,
      callback,
    });
    await pe.execute();

    if (callback) {
      return;
    }

    if (pageCallback) {
      await this.handlePaging({ peRead: pe, pageCallback });
      pe.close();
      return;
    }

    pe.close();

    const result = this.getResultFromPipelineResult({
      ...params,
      stageName,
      pipelineExecutor: pe,
    });

    if (returnConnectionResult && pe.stages[0].stageQuery.isListQuery) {
      return {
        ...pe.properties[stageName].connectionResult,
      };
    }

    return result;
  }

  // Used to execute a graphql mutation using the pipeline mechanism
  public async executeGraphqlMutation(
    params: IExecuteMutationParams,
  ): Promise<IMutationResult> {
    const {
      input,
      mutation,
      variables,
      useTestData,
      executionConfigName,
      configName,
      testContext,
      remoteAsync,
      pipelineExecutor,
    } = params;
    const stageName = this.calculateStageName(mutation, {
      ...variables,
      ...input,
    });

    const pipelineParams: ICreatePipelineExecutorParams = {
      stages: [
        {
          _stageType: StageType.graphQLMutation,
          _name: stageName,
          ...params,
        },
      ],
      input,
      useTestData,
      executionConfigName,
      configName,
      testContext,
    };

    if (remoteAsync) {
      pipelineParams.pipelineExecutionType =
        PipelineExecutionType.REMOTE_MUTATION;

      if (pipelineExecutor) {
        pipelineParams.requestId = pipelineExecutor.requestId;
      }

      const remoteAsyncPipelineExecutionId =
        (await this.executePipelineRemoteAsync({
          ...params,
          ...pipelineParams,
          ...params.remoteAsyncLoggingParams,
        })) as string;

      return {
        remoteAsyncPipelineExecutionId,
      };
    }

    const pe = this.createPipelineExecutor(pipelineParams);
    await pe.execute();
    pe.close();

    const result = this.getResultFromPipelineResult({
      ...params,
      stageName,
      pipelineExecutor: pe,
    });

    return {
      result,
      requestId: pe.requestId,
      retryPipelineExecutionId: (
        pe.stages[0].workingProperties as IStagePropertiesGraphQLMutation
      ).remoteAsyncPipelineExecutionId,
    };
  }

  private getResultFromPipelineResult(params: {
    pipelineExecutor: PipelineExecutor;
    query?: string;
    mutation?: string;
    stageName: string;
    input?: any;
  }): IQueryConnectionResult | any {
    const { pipelineExecutor, query, mutation, stageName, input } = params;

    const pipelineResult = pipelineExecutor.output;

    const queryFieldName = getGraphqlQueryFieldName(query || mutation, input);

    let realResult;

    if (
      pipelineResult[stageName] &&
      pipelineResult[stageName][queryFieldName]
    ) {
      realResult = pipelineResult[stageName][queryFieldName];
    } else {
      realResult = pipelineResult[stageName];
    }

    return realResult;
  }

  // Used for all remote pipeline executions
  private getRemoteArgs(params: IPipelineRemoteParams): IPipelineRemoteParams {
    // WARNING - this needs to get exactly the members of IPipelineRemoteParams (and its super interfaces)
    // Typescript does not provide a way to introspect the interfaces, so this need to be done
    // manually

    const executionConfigName =
      params.executionConfigName ||
      this.clientManager.schemaManager.executionConfigName;

    function removePipelineExecutor(stage: IStageProperties): IStageProperties {
      const clonedStage = _.clone(stage);

      delete clonedStage.pipelineExecutor;

      return clonedStage;
    }

    return {
      // IRemoteContext
      // WARNING - when updating this, also look at getGraphQLRequestContext (below)
      clientId: this.clientManager.clientId,
      requestId: params.requestId || uuidv1(),
      // Sometimes the executionId can be provided
      executionId: params.executionId || uuidv1(),
      configName: params.configName,
      executionConfigName,
      useTestData:
        params.useTestData || this.clientManager.schemaManager.useTestData,
      testContext: params.testContext,
      pageNumber: params.pageNumber || 0,
      fetchPolicy: params.fetchPolicy || this.fetchPolicy,
      enableNetworkSubscriptions:
        params.enableNetworkSubscriptions || this.enableNetworkSubscriptions,
      clientLocalTimestamp: getLocalIsoTimestamp(),

      // IPipelineRemoteParams
      name: MetadataSupport.getQualifiedName(params.name, params.configName),
      stages: params.stages?.map(removePipelineExecutor),
      input: params.input,
      pipelineExecutionType:
        params.pipelineExecutionType || PipelineExecutionType.REGULAR_PIPELINE,
      doRemoteAsyncLogging: params.doRemoteAsyncLogging,
      logOutput: params.logOutput,
      callingExecutionId: params.callingExecutionId,
      requestingUserId: this.requestingUserId,
    };
  }

  // Used for GraphQL requests to the apolloHandler
  public getGraphQLRequestContext(stage: StageImpl) {
    const { executor } = stage;
    const { schemaManager } = this;

    const executionConfigName =
      executor.executionConfigName || schemaManager.executionConfigName;

    // IRemoteContext
    // WARNING - when updating this, also look at getRemoteArgs (above)
    const { workingProperties } = stage;
    stage.remoteContext = {
      clientId: this.clientManager.clientId,
      requestId: workingProperties._requestId || uuidv1(),
      executionId: workingProperties._executionId || uuidv1(),
      configName: stage.configName,
      executionConfigName,
      useTestData:
        executor.useTestData !== undefined
          ? executor.useTestData
          : schemaManager.useTestData,
      pageNumber: workingProperties.pageNumber || 0,
      outputAsMap: workingProperties.outputAsMap,
      orderBy: workingProperties.orderBy,
      ascDesc: workingProperties.ascDesc,
      testContext: workingProperties._testContext,
      fetchPolicy: stage.getFetchPolicy(),
      enableNetworkSubscriptions: this.enableNetworkSubscriptions,
      useLocalLink: stage.useLocalLink,
      clientLocalTimestamp: getLocalIsoTimestamp(),
      requestingUserId: this.requestingUserId,
    };
    return {
      headers: {
        [HEADER_REMOTE_CONTEXT]: safeJsonStringify(stage.remoteContext),
      },
    };
  }

  public getGraphQLRequestContextBatch(stage: StageImpl, batchId: string) {
    return {
      headers: {
        [HEADER_REMOTE_CONTEXT]: safeJsonStringify({
          ...stage.remoteContext,
          batchId,
        }),
      },
    };
  }

  public static getRemoteContextFromHeaders(headers: string[]): IRemoteContext {
    const header = headers?.[HEADER_REMOTE_CONTEXT];
    if (header) {
      return JSON.parse(header);
    }
  }

  // Returns the executionId, or if waitForCompletion is set, returns the IPipelineExecution result
  public async executePipelineRemoteAsync(
    params: IExecutePipelineRemoteAsyncParams,
  ): Promise<string | IPipelineExecution> {
    const {
      name,
      queueName,
      actualQueueName,
      s3UUID,
      waitForCompletion,
      pipelineExecutionType,
    } = params;

    if (!name && !pipelineExecutionType) {
      throw new Error(
        'You must specify either a pipeline name or a pipelineExecutionType',
      );
    }

    let sqsParams;
    let remoteArgs: IPipelineRemoteParams;
    try {
      remoteArgs = this.getRemoteArgs(params);
      if (waitForCompletion) {
        remoteArgs.doRemoteAsyncLogging = true;
      }
      const { requestId, executionId, executionConfigName } = remoteArgs;
      const message = safeJsonStringify(remoteArgs);

      logger.debug(
        {
          name,
          requestId,
          executionId,
        },
        'Sent SQS (start)',
      );

      logger.debug({ length: message.length, message }, 'Sending message');

      const messagePayload = message;
      //      const messagePayload = LZString.compressToUint8Array(message);

      const queueUrl = this.clientManager.getSqsQueueUrl(
        actualQueueName
          ? { queueName: actualQueueName }
          : {
              queueNameExtension: queueName,
            },
      );
      sqsParams = {
        MessageBody: 'notUsed',
        MessageAttributes: {
          RequestId: {
            DataType: 'String',
            StringValue: requestId,
          },
          ExecutionId: {
            DataType: 'String',
            StringValue: executionId,
          },
          ConfigName: {
            DataType: 'String',
            StringValue: executionConfigName,
          },
        },
        QueueUrl: queueUrl,
        ...(queueUrl.endsWith('.fifo') && {
          MessageDeduplicationId: uuidv1(),
          MessageGroupId: this.clientManager.stackId,
        }),
      };

      const messageAttributes: IPipelineRemoteAsyncMessageAttributes =
        sqsParams.MessageAttributes;

      if (s3UUID) {
        messageAttributes.S3UUID = {
          DataType: 'String',
          StringValue: s3UUID,
        };
      } else {
        if (messagePayload.length > 256 * 1024) {
          const { key, bucket } = this.getPipelineAsyncS3Info({
            uuid: executionId,
          });

          messageAttributes.S3UUID = {
            DataType: 'String',
            StringValue: executionId,
          };
          await this.clientManager.s3Support.putObject({
            Bucket: bucket,
            Key: key,
            Body: Buffer.from(messagePayload),
          });
        } else {
          messageAttributes.Payload = {
            //          DataType: 'Binary',
            //          BinaryValue: Buffer.from(messagePayload),
            DataType: 'String',
            StringValue: messagePayload,
          };
        }
      }

      logger.debug(sqsParams, 'Sending SQS message');
      await retry({
        // Sometimes it can take > 20 seconds to send this. We don't have the patience for that
        timeoutMs: TIMEOUT_ASYNC_SEND,
        command: async () => {
          // This may do a delay
          if (remoteArgs.testContext) {
            await handleTestRequest({
              clientManager: this.clientManager,
              whatsHappening: TestWhatsHappening.ASYNC_SEND,
              testContext: remoteArgs.testContext,
              remoteContext: remoteArgs,
            });
          }
          return this.clientManager.sqsClient.sendMessage(sqsParams);
        },
      });

      logger.debug(
        {
          name,
          requestId,
          executionId,
        },
        'Sent SQS (sent)',
      );
      if (waitForCompletion) {
        const result = (
          await this.waitForRemoteAsyncPipeline({
            executionId,
          })
        )[0];
        if (result.errorString) {
          const error = new Error(
            `Problem executing pipeline: : ${result.errorString}`,
          );
          (error as any).pipelineExecution = result;
          throw error;
        }
        return result;
      }

      return executionId;
    } catch (error) {
      reThrow({
        logger,
        logObject: { sqsParams },
        message: `Error sending message with pipeline name ${name} on queue ${queueName}`,
        error,
      });
    }
  }

  public async getRemoteAsyncPipelineStatus(params: {
    executionId?: string;
    requestId?: string;
    allIncomplete?: boolean;
    chunkId?: number;
  }): Promise<IPipelineExecution[]> {
    const { executionId, requestId, allIncomplete, chunkId } = params;
    const fieldsString = this.graphQLManager.makeGraphqlFieldListFromName({
      name: PIPELINE_EXECUTION,
      configName: SYSTEM,
    });

    const chunkIdFieldString = exists(chunkId) ? `chunkId: ${chunkId}` : '';

    const queryBody = `{ ${fieldsString} }`;
    if (executionId) {
      const result = await this.executeGraphqlQuery({
        query:
          'query {' +
          `  list${PIPELINE_EXECUTION}(executionId: "${executionId}") {` +
          `    items ${queryBody} } }`,
        noCache: true,
      });

      return result;
    }
    if (requestId) {
      return this.executeGraphqlQuery({
        query: `query { list${PIPELINE_EXECUTION}(requestId: "${requestId}" ${chunkIdFieldString}) { items ${queryBody} } }`,
        noCache: true,
      });
    }
    if (allIncomplete) {
      // Note this boolean field is actually a string, because we can't have an index
      // on a boolean (WORM-1710)
      return this.executeGraphqlQuery({
        query: `query { list${PIPELINE_EXECUTION}(inProgress: "true" ${chunkIdFieldString}) { items ${queryBody} } }`,
        noCache: true,
      });
    }
    throw new Error('Specify either executionId, requestId, or allIncomplete');
  }

  public async printRemoteAsyncPipelineStatus(params: { requestId: string }) {
    const { requestId } = params;
    const statuses = await this.getRemoteAsyncPipelineStatus({
      requestId,
    });
    const statusTree = getRemoteAsyncPipelineStatusTree(statuses);
    printRemoteAsyncPipelineStatusTree({
      statusTree,
      requestId,
    });
    const countSummary = getCountSummary(statusTree);
    printCountSummary(countSummary);
  }

  public async waitForRemoteAsyncPipeline(params: {
    executionId?: string;
    requestId?: string;
    allIncomplete?: boolean;
  }): Promise<IPipelineExecution[]> {
    let status;
    const waitForPipeline = waitFor(async () => {
      status = await this.getRemoteAsyncPipelineStatus(params);
      if (!exists(params.executionId)) {
        logger.debug(`Checking ${status.length} pipeline status results`);
        return (
          status.filter((s) => {
            if (!s.endTime) {
              logger.info(
                `Pipeline not finished: ${PipelineManager.statusToString(s)}`,
              );
              return true;
            }
            return false;
          }).length === 0
        );
      }
      if (status[0]) {
        return !!status[0].endTime;
      }
    }, 1000);

    await waitForPipeline();
    return status;
  }

  public static comparePipelineExecutions(a, b) {
    const getString = (record) => {
      const aChunkIdRaw = PipelineManager.getEffectiveChunkId(record);
      let aChunkId = '000' + (aChunkIdRaw ? aChunkIdRaw.toString() : -1);
      aChunkId = aChunkId.slice(-3);
      const aPageRaw = PipelineManager.getEffectivePageNumber(record);
      let aPage = '000' + (aPageRaw ? aPageRaw.toString() : -1);
      aPage = aPage.slice(-3);

      return aChunkId + aPage + record.outputSummary?.nextToken;
    };

    const aString = getString(a);
    const bString = getString(b);

    if (aString < bString) {
      return -1;
    }
    if (aString < bString) {
      return 1;
    }
    return 0;
  }

  public static getEffectiveChunkId(status: IPipelineExecution) {
    return !isNullOrUndefined(status.outputSummary?.chunkId)
      ? status.outputSummary.chunkId
      : !isNullOrUndefined(status.inputSummary?.chunkId)
        ? status.inputSummary?.chunkId
        : undefined;
  }

  public static getEffectivePageNumber(status: IPipelineExecution) {
    return !isNullOrUndefined(status.outputSummary?.pageNumber)
      ? status.outputSummary.pageNumber
      : !isNullOrUndefined(status.inputSummary?.pageNumber)
        ? status.inputSummary?.pageNumber
        : undefined;
  }

  public static statusToString(status: IPipelineExecution): string {
    const tryCount =
      status.sqsTryCount > 1 ? `(sqs try #${status.sqsTryCount}) ` : '';
    const incomplete = status.endTime ? '' : `INC (${status.startTime}) `;
    const timedOut = status.timedOut
      ? `TIMEOUT (retry: ${status.timedOutRetryExecutionId}) `
      : '';
    const mutationTimedOut = status.mutationTimedOut
      ? `MUT T/O (retry: ${status.mutationTimedOutRetryExecutionId}) `
      : '';
    const error1 = status.errorString ? 'ERROR ' : '';
    const error2 = status.errorString
      ? `ERROR: ${status.errorString.slice(0, 50)} `
      : '';
    const inputOrOutput = status.outputSummary
      ? status.outputSummary
      : status.inputSummary
        ? status.inputSummary
        : undefined;

    const chunkId = PipelineManager.getEffectiveChunkId(status);
    const chunk = chunkId !== undefined ? `chunk: ${chunkId} ` : '';

    const pageNumber = PipelineManager.getEffectivePageNumber(status);
    const pageNumberText =
      pageNumber !== undefined ? `page: ${pageNumber} ` : '';

    const recordCount =
      inputOrOutput?.recordCount !== undefined
        ? `count: ${inputOrOutput.recordCount} `
        : '';
    const nextToken = inputOrOutput?.nextToken
      ? `next: ${JSON.parse(inputOrOutput.nextToken).id} `
      : '';
    return (
      `Execution: ${status.executionId} ${
        status.pipelineName || status.pipelineExecutionType
      } ${chunk}${pageNumberText}` +
      `${recordCount}${error1}${tryCount}${incomplete}${timedOut}` +
      `${mutationTimedOut}queue/exec ${
        status.elapsedFromSentTime - status.executionTime
      }/${status.executionTime}ms ${nextToken} ${error2}`
    );
  }

  public getOutputPipelineInfo(
    operation: OperationDefinitionNode,
    variables?: any,
  ): IOutputPipelineInfo {
    const outputPipeline = getTopLevelDirective(
      operation,
      DIRECTIVE_OUTPUTPIPELINE,
    );

    if (!outputPipeline) {
      return { hasOutputPipeline: false };
    }
    const returnVal: IOutputPipelineInfo = { hasOutputPipeline: true };

    const outputPipelineNameNode = outputPipeline.arguments.find(
      (argument) => argument.name.value === DIRECTIVE_OUTPUTPIPELINE_NAME,
    );

    if (outputPipelineNameNode) {
      const outputPipelineNameNodeValue =
        outputPipelineNameNode.value as StringValueNode;
      if (outputPipelineNameNodeValue) {
        returnVal.outputPipeline = outputPipelineNameNodeValue.value;
      }
    }

    const outputPipelineLogOutputNode = outputPipeline.arguments.find(
      (argument) => argument.name.value === DIRECTIVE_OUTPUTPIPELINE_LOGOUTPUT,
    );

    if (outputPipelineLogOutputNode) {
      const outputPipelineLogOutputValue =
        outputPipelineNameNode.value as BooleanValueNode;
      if (outputPipelineLogOutputValue) {
        returnVal.logOutput = outputPipelineLogOutputValue.value;
      }
    }

    const outputPipelineLogExecutionNode = outputPipeline.arguments.find(
      (argument) =>
        argument.name.value === DIRECTIVE_OUTPUTPIPELINE_LOGEXECUTION,
    );

    if (outputPipelineLogExecutionNode) {
      const outputPipelineLogExecutionValue =
        outputPipelineNameNode.value as BooleanValueNode;
      if (outputPipelineLogExecutionValue) {
        returnVal.logExecution = outputPipelineLogExecutionValue.value;
      }
    }

    const outputPipelineArgumentsNode = outputPipeline.arguments.find(
      (argument) => argument.name.value === DIRECTIVE_OUTPUTPIPELINE_ARGUMENTS,
    );
    if (outputPipelineArgumentsNode) {
      const outputPipelineArgumentsNodeValue =
        outputPipelineArgumentsNode.value as VariableNode;

      if (outputPipelineArgumentsNodeValue && variables) {
        returnVal.outputPipelineArgs = valueFromASTUntyped(
          outputPipelineArgumentsNodeValue,
          variables,
        ) as any[];
      }
    }

    const aggregatePipelineNameNode = outputPipeline.arguments.find(
      (argument) =>
        argument.name.value === DIRECTIVE_OUTPUTPIPELINE_AGGREGATE_PIPELINE,
    )?.value as StringValueNode;

    returnVal.aggregatePipeline = aggregatePipelineNameNode?.value;
    return returnVal;
  }

  public async executePipelineRemote(params: IExecutePipelineRemoteParams) {
    const remoteParams = this.getRemoteArgs(params);
    const {
      name,
      retryFunction,
      executionId,
      requestId,
      suppressErrorLogging,
    } = {
      ...params,
      ...remoteParams,
    };
    const fullFunctionName = getLambdaFunctionFullName(
      this.clientManager.stackId,
      PIPELINE_HANDLER_FUNCTION,
    );

    const payload = new TextEncoder().encode(safeJsonStringify(remoteParams));

    const lambdaParams: InvokeWithResponseStreamCommandInput = {
      FunctionName: fullFunctionName,
      Payload: payload,
    };

    return retry({
      command: async () => {
        const result = (await this.clientManager.lambdaSupport
          .getLambdaClient(PIPELINE_HANDLER_FUNCTION)
          .invokeWithResponseStream(
            lambdaParams,
          )) as InvokeWithResponseStreamCommandOutput;
        if (!result.EventStream) {
          throw new Error(
            `Result with no Payload object (this is bad - look at server logs) ${stringifyPretty(
              remoteParams,
            )}`,
          );
        }
        const events = result.EventStream;
        const chunks = [];
        for await (const event of events) {
          if (event.PayloadChunk) {
            chunks.push(new TextDecoder().decode(event.PayloadChunk.Payload));
          } else if (event.InvokeComplete) {
            if (event.InvokeComplete.ErrorCode) {
              throw new Error(
                `Problem reading stream (this is bad - look at server logs): ${event.InvokeComplete.ErrorCode} ${event.InvokeComplete.ErrorDetails}`,
              );
            }
          }
        }
        const resultString = ''.concat(...chunks);
        const parsed: IPipelineRemoteResponse = JSON.parse(resultString);
        if (!parsed) {
          throw new Error(
            `Parsed payload is empty (this is bad - look at server logs) ${stringifyPretty(
              remoteParams,
            )}`,
          );
        }

        if (result.StatusCode === 200 && parsed.response) {
          return parsed.response;
        }

        if (parsed.error) {
          let error;
          try {
            error = JSON.parse(parsed.error);
          } catch (err) {
            reThrow({
              logger,
              error: err,
              message: 'Problem parsing error from remote (this is bad)',
            });
          }
          error.statusCode = parsed.errorStatusCode;
          reThrow({
            logger: suppressErrorLogging ? undefined : logger,
            error,
            logObject: { name, requestId, executionId },
            message: 'Error on remote pipeline',
          });
        }
        reThrow({
          logger,
          logObject: { name, requestId, executionId },
          message: `Unknown error executing remote pipeline - status code: ${result.StatusCode} - this is likely caused by a timeout or out of memory condition, check the pipeline handler (${PIPELINE_HANDLER_FUNCTION}) logs`,
        });
      },
      retryFunction,
      suppressErrorLogging,
    });
  }

  public async getNamedPipeline(
    pipelineName: string,
  ): Promise<IPipelineRemoteParams> {
    // FIXME - replace this mechanism with a more general config object mechanism
    let pipeline = this.clientManager.getLocalConfigObject({
      objectName: pipelineName,
      recordType: APP_DEF_PIPELINE,
    });
    if (pipeline) {
      return pipeline;
    }
    pipeline = this.pipelineCache[pipelineName];
    if (!pipeline) {
      // The list of pipelines might have changed in some other process
      // If the contents of a pipeline changed, when all the lambda
      // servers are restarted so they get the correct one
      const pipelinesResult = await this.listRecords({
        entityName: APP_DEF_PIPELINE,
        configName: SYSTEM,
        noCache: true,
      });
      this.pipelineCache = {};
      pipelinesResult.items.forEach((p) => (this.pipelineCache[p.id] = p));
      pipeline = this.pipelineCache[pipelineName];
    }
    if (!pipeline) {
      throw new Error(`Pipeline ${pipelineName} not found`);
    }
    return pipeline;
  }

  public async executeNamedPipeline(params: ICreatePipelineExecutorParams) {
    try {
      const pipeline = await this.getNamedPipeline(params.name);
      return this.executePipelineObject({ ...params, ...pipeline });
    } catch (error) {
      throw new Error(
        `Failed to execute pipeline: ${params.name}: ${getErrorString(error)}`,
      );
    }
  }

  public async executePipelineObject(params: ICreatePipelineExecutorParams) {
    let executor: PipelineExecutor;
    try {
      executor = this.createPipelineExecutor(params);
      await executor.execute();
      if (!params.callback) {
        executor.close();
      }
      return executor.output;
    } catch (error) {
      let contextDump = '[not captured]';
      if (!this.suppressPipelineDumpOnError) {
        contextDump = `:\nCurrent pipeline:: ${stringify({
          input: params.input,
          noThrow: true,
        })}`;
      }
      reThrow({
        logger,
        message: 'Failed to execute pipeline',
        logObject: {
          pipeline: PipelineManager.remoteIdObject(params),
          context: contextDump,
        },
        error,
      });
    }
  }

  public async validateNamedPipelines() {
    const pipelineRecordsResult = await this.listRecords({
      entityName: APP_DEF_PIPELINE,
      configName: SYSTEM,
      noCache: true,
    });
    const pipelines: { id: string; stages: IStageProperties[] }[] =
      pipelineRecordsResult.items;

    let failedValidation;
    for (const pipeline of pipelines) {
      const pe = this.createPipelineExecutor({
        name: pipeline.id,
        tracingIdentifier: pipeline.id,
        stages: pipeline.stages,
      });
      if (pe.failedValidation) {
        failedValidation = true;
      }
    }
    if (failedValidation) {
      throw new Error(
        'Named pipeline validation failed - see previous error messages',
      );
    }
  }

  private calculateStageName(query: string, context?: any): string {
    let stageName = null;
    let queryFieldName;
    try {
      queryFieldName = getGraphqlQueryFieldName(query, context);
    } catch (error) {
      let curly = '';
      if (getErrorString(error).includes('EOF')) {
        curly += '(You may have left out a curly brace)';
      }
      reThrow({
        logger,
        message: `Problem parsing query ${curly}: ${getErrorString(
          error,
        )}: query: ${query}`,
        error,
      });
    }

    try {
      const entityId = getFieldNameInfo(queryFieldName).entityId;
      // This can happen for the __schema query for example, just return something benign
      if (!entityId) {
        return 'data';
      }
      stageName = entityId;
      stageName = stageName.charAt(0).toLowerCase() + stageName.slice(1);
    } catch (error) {
      reThrow({
        logger,
        message: `Error (likely a bug) trying to calculate the stage name for: ${query}`,
        error,
      });
    }
    return stageName;
  }

  public close(executor: PipelineExecutor) {
    try {
      if (executor.category) {
        this.countPipelineClose(executor.category);
      }
      this.executors.delete(executor);
    } catch (error) {
      throw new Error(
        `Problem closing executor: ${executor.getTracingInfo()}: ${getErrorString(
          error,
        )}`,
      );
    }
  }

  public async runStackInstallPipeline(params: { configName: string }) {
    return this.runStackInstallUninstallPipeline({
      ...params,
      uninstall: false,
    });
  }

  public async runStackUninstallPipeline(params: { configName: string }) {
    return this.runStackInstallUninstallPipeline({
      ...params,
      uninstall: true,
    });
  }

  private async runStackInstallUninstallPipeline(params: {
    configName: string;
    uninstall?: boolean;
  }) {
    const { configName, uninstall = false } = params;

    const pipelineName = uninstall
      ? STACK_UNINSTALL_PIPELINE
      : STACK_INSTALL_PIPELINE;

    const qualifiedPipelineName = MetadataSupport.getQualifiedName(
      pipelineName,
      configName,
    );

    const pipelines = await this.executeGraphqlQuery({
      query: 'query { listPipeline { items { id } } }',
    });

    if (pipelines.find((pl) => pl.id === qualifiedPipelineName)) {
      await this.schemaManager.changeExecutionConfig(configName);

      logger.info(
        `Running ${STACK_INSTALL_PIPELINE} pipeline for ${configName}`,
      );

      return this.executeNamedPipeline({ name: qualifiedPipelineName });
    }
  }

  private async handlePaging(params: {
    peRead: PipelineExecutor;
    pageCallback?:
      | ((records: any[]) => Promise<void>)
      | ((records: any[]) => void);
  }) {
    const { peRead, pageCallback } = params;
    await pageCallback(
      (peRead.properties.records.connectionResult as IQueryConnectionResult)
        .items,
    );
    let cont: boolean = peRead.hasPageForward();
    while (cont) {
      cont = await peRead.pageForward();
      await pageCallback(
        (peRead.properties.records.connectionResult as IQueryConnectionResult)
          .items,
      );
    }
  }

  public async getRecord(params: {
    entityName: string;
    id: string;
    configName?: string;
    executionConfigName?: string;
    input?: { [key: string]: string | number | boolean };
    fetchPolicy?: string;
    requestId?: string;
    noCache?: boolean;
    deep?: boolean;
  }) {
    const {
      entityName,
      id,
      configName = MetadataSupport.getConfigName(entityName),
      executionConfigName,
      input,
      requestId,
      noCache,
      deep,
    } = params;

    let fetchPolicy = params.fetchPolicy;
    if (noCache) {
      fetchPolicy = FETCH_POLICY_NO_CACHE;
    }

    const entity = this.schemaManager.getEntityType({
      name: entityName,
      configName,
    });

    const fieldsString =
      this.graphQLManager.makeGraphqlFieldListFromName({
        name: entity.typeDefinition,
        configName: entity.configName,
        deep,
      }) + // FIXME - this is to support the Rest API in getting these fields; they should be defined in a typedef
      // somewhere, this is a hack
      CREATION_TIME +
      ' ' +
      UPDATE_TIME;
    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    const query = `query { get${unqualifiedEntityName} (id: "${id}") {
      ${fieldsString}
    } }`;

    return this.executeGraphqlQuery({
      query,
      configName,
      executionConfigName,
      input,
      fetchPolicy,
      requestId,
    });
  }

  public buildGetQuery(entityName, entityId): string {
    return this.graphQLManager.buildGetQuery(entityName, entityId);
  }

  public buildListQuery(entityName: string, attributes?: string[]): string {
    return this.graphQLManager.buildListQuery(entityName, attributes);
  }

  public makeGraphqlFieldListFromName(params: IMetadataRequest): string {
    return this.graphQLManager.makeGraphqlFieldListFromName(params);
  }

  private makeArgumentsString(params: {
    entityName: string;
    configName?: string;
    queryArguments?: any;
  }): {
    entity: IEntityType;
    typeDefinition: TypeDefinition;
    argumentsString: string;
  } {
    const { entityName, configName, queryArguments } = params;
    const entity = this.schemaManager.getEntityType({
      name: entityName,
      configName,
    });

    const typeDefinition = this.schemaManager.getTypeDefinition({
      name: entity.typeDefinition,
      configName: entity.configName,
    });

    const argumentsString = queryArguments
      ? makeGraphQLArgumentsString({
          queryArguments,
          typeDefinition,
        })
      : '';

    return { entity, typeDefinition, argumentsString };
  }

  private makeListQueryStage(params: {
    entityName: string;
    configName?: string;
    queryArguments?: { [key: string]: SimpleType };
    queryDirectives?: string;
    fieldDirectives?: string;
    filter?: string;
    fetchPolicy?: string;
    variables?: Record<string, any>;
    variableTypes?: { [key: string]: string };
    fields?: string[];
    deep?: boolean;
    nextToken?: string;
    outputAsMap?: boolean;
  }): IStageProperties {
    const {
      entityName,
      configName = MetadataSupport.getConfigName(entityName),
      queryArguments,
      queryDirectives = '',
      fieldDirectives = '',
      filter,
      fetchPolicy,
      variables,
      variableTypes,
      fields,
      deep,
      nextToken,
      outputAsMap,
    } = params;

    const { argumentsString, entity } = this.makeArgumentsString({
      entityName,
      configName,
      queryArguments,
    });

    let fieldList;
    if (fields) {
      fieldList = fields.join(' ');
    } else {
      const fieldsString = this.graphQLManager.makeGraphqlFieldListFromName({
        name: entity.typeDefinition,
        configName: entity.configName,
        deep,
      });
      // FIXME - this is to support the Rest API in getting these fields; they should be defined in a typedef
      // somewhere, this is a hack
      fieldList = fieldsString + CREATION_TIME + ' ' + UPDATE_TIME;
    }

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    const filterToUse = filter ? `@filter(expr: "${filter}")` : '';

    let variableDefinitions = '';

    if (variables) {
      if (!variableTypes) {
        throw new Error('Variable types must be specified for variables');
      }

      const missingTypes = _.difference(
        Object.keys(variables),
        Object.keys(variableTypes),
      );

      if (missingTypes.length) {
        throw new Error(
          `Missing variable types for variables ${missingTypes.join(', ')}`,
        );
      }

      variableDefinitions = `(${Object.keys(variables)
        .map(
          (variableName) => `$${variableName}: ${variableTypes[variableName]}`,
        )
        .join(' ')})`;
    }

    const query = `query ${variableDefinitions} ${queryDirectives} {
      list${unqualifiedEntityName} ${argumentsString} ${filterToUse} ${fieldDirectives}  {
        items { ${fieldList} }
      }
    }`;

    return {
      _stageType: StageType.graphQLQuery,
      _name: 'records',
      query,
      fetchPolicy,
      variables: { ...variables, nextToken },
      connectionResult: true,
      outputAsMap,
    };
  }

  public async listRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    queryArguments?: { [key: string]: SimpleType };
    queryDirectives?: string;
    fieldDirectives?: string;
    input?: { [key: string]: string };
    filter?: string;
    fetchPolicy?: string;
    noCache?: boolean;
    variables?: Record<string, any>;
    variableTypes?: { [key: string]: string };
    fields?: string[];
    // Must specified the @paged directive to use this
    pageCallback?:
      | ((records: any[]) => Promise<void>)
      | ((records: any[]) => void);
    requestId?: string;
    deep?: boolean;
    nextToken?: string;
    outputAsMap?: boolean;
    pipelineCallback?: (e: PipelineExecutor) => void;
  }): Promise<IQueryConnectionResult> {
    const {
      input,
      configName,
      executionConfigName,
      pageCallback,
      noCache,
      requestId,
      pipelineCallback,
    } = params;

    if (noCache) {
      params.fetchPolicy = FETCH_POLICY_NO_CACHE;
    }

    const queryStage = this.makeListQueryStage(params);
    const peRead = this.createPipelineExecutor({
      stages: [queryStage],
      input,
      executionConfigName,
      configName,
      requestId,
      callback: pipelineCallback,
    });

    await peRead.execute();

    if (pageCallback) {
      await this.handlePaging({ peRead, pageCallback });
      return;
    }

    return peRead.properties.records.connectionResult as IQueryConnectionResult;
  }

  public async createRecord(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    record: any;
    requestId?: string;
    testContext?: ITestContext;
    noCache?: boolean;
    upsert?: boolean;
  }): Promise<any> {
    const paramsToPass = Object.assign({}, params, {
      records: [params.record],
    });

    delete paramsToPass.record;

    return this.createRecords(paramsToPass);
  }

  public async createRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    // Provided for the initial creation of the EntityType definitions
    typeDefName?;
    records: any[];
    noCache?: boolean;
    upsert?: boolean;
    requestId?: string;
    testContext?: ITestContext;
    remoteAsync?: boolean;
    pipelineExecutor?: PipelineExecutor;
  }): Promise<IMutationResult> {
    const {
      entityName,
      executionConfigName,
      mutationDirectives = '',
      typeDefName,
      requestId,
      noCache,
      upsert,
      testContext,
      remoteAsync,
      pipelineExecutor,
    } = params;
    let { configName, records } = params;
    configName = configName || MetadataSupport.getConfigName(entityName);

    let inputType = typeDefName;
    if (!inputType) {
      const { typeName, typeDef } = this.metadataSupport.getTypeInfoFromEntity({
        entityName,
        configName,
        noThrowTypeDef: true,
      });
      inputType = typeName;

      // Sometimes the typeDef might not be available, in the case of initial loading, so
      // assume the records are in the correct format, which they will be.
      if (typeDef) {
        this.metadataSupport.convertObjectReferences({
          obj: records,
          typeDef,
          convertType: ConvertType.OBJECT_TO_ID,
        });
      }
    }

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    let operationName;
    let inputTypeString;

    if (records.length === 1) {
      records = records[0];
      operationName = upsert ? 'upsert' : 'create';
      inputTypeString = `${inputType}Input!`;
    } else {
      operationName = upsert ? 'batchUpsert' : 'batchCreate';
      inputTypeString = `[${inputType}Input]!`;
    }

    const mutation = `mutation ($records: ${inputTypeString}) ${mutationDirectives} {
      ${operationName}${unqualifiedEntityName}(input: $records) {
        id
      }
    }`;

    const result = await this.executeGraphqlMutation({
      mutation,
      variables: { records },
      configName,
      executionConfigName,
      requestId,
      testContext,
      remoteAsync,
      noCache,
      pipelineExecutor,
    });

    return result;
  }

  public async upsertRecord(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    record: any;
    requestId?: string;
    noCache?: boolean;
  }): Promise<any> {
    return this.createRecord({ ...params, upsert: true });
  }

  public async updateRecord(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    replaceDirective?: string;
    requestId?: string;
    noCache?: boolean;
    record: any;
  }): Promise<IMutationResult> {
    const {
      entityName,
      executionConfigName,
      mutationDirectives = '',
      replaceDirective = REPLACE_DIRECTIVE_DEFAULT,
      requestId,
      record,
      noCache,
    } = params;
    let { configName } = params;
    configName = configName || MetadataSupport.getConfigName(entityName);
    const workingRecord = _.cloneDeep(record);
    const { typeName: inputType, typeDef } =
      this.metadataSupport.getTypeInfoFromEntity({
        entityName,
        configName,
      });

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    this.metadataSupport.convertObjectReferences({
      obj: workingRecord,
      typeDef,
      convertType: ConvertType.OBJECT_TO_ID,
    });

    const mutation = `mutation ($record: ${inputType}Input!) ${mutationDirectives} {
      update${unqualifiedEntityName}(input: $record) ${replaceDirective} {
        id
      }
    }`;

    return this.executeGraphqlMutation({
      mutation,
      variables: { record: workingRecord },
      configName,
      executionConfigName,
      requestId,
      noCache,
    });
  }

  public async updateRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    replaceDirective?: string;
    records: any[];
    remoteAsync?: boolean;
    pipelineExecutor?: PipelineExecutor;
    requestId?: string;
    noCache?: boolean;
  }): Promise<IMutationResult> {
    const {
      entityName,
      executionConfigName,
      mutationDirectives = '',
      replaceDirective = REPLACE_DIRECTIVE_DEFAULT,
      records,
      remoteAsync,
      pipelineExecutor,
      requestId,
      noCache,
    } = params;
    let { configName } = params;

    if (records.length === 0) {
      return;
    }

    logger.debug(`Updating ${records.length} records`);

    configName = configName || MetadataSupport.getConfigName(entityName);

    const { typeName: inputType, typeDef } =
      this.metadataSupport.getTypeInfoFromEntity({
        entityName,
        configName,
      });

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    this.metadataSupport.convertObjectReferences({
      obj: records,
      typeDef,
      convertType: ConvertType.OBJECT_TO_ID,
    });

    const mutation = `mutation ($records: [${inputType}Input]!) ${mutationDirectives} {
      batchUpdate${unqualifiedEntityName}(input: $records) ${replaceDirective} {
        id
      }
    }`;

    return this.executeGraphqlMutation({
      mutation,
      variables: { records },
      configName,
      executionConfigName,
      remoteAsync,
      pipelineExecutor,
      requestId,
      noCache,
    });
  }

  public async upsertRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    records: any[];
    noCache?: boolean;
  }) {
    return this.createRecords({ ...params, upsert: true });
  }

  public async deleteRecord(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    id: string;
    noCache?: boolean;
  }): Promise<any> {
    const {
      entityName,
      executionConfigName,
      mutationDirectives = '',
      id,
      noCache,
    } = params;
    let { configName } = params;
    configName = configName || MetadataSupport.getConfigName(entityName);

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    if (!id || id.length === 0 || id === '') {
      throw new Error('id not specified');
    }

    const mutation = `mutation deleteRecord ${mutationDirectives} {
      delete${unqualifiedEntityName} (input: { id: "${id}" }) {
        id
      }
    }`;

    const result = await this.executeGraphqlMutation({
      mutation,
      configName,
      executionConfigName,
      noCache,
    });
    return result;
  }

  public async deleteRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    mutationDirectives?: string;
    ids: string[];
    noCache?: boolean;
  }): Promise<any> {
    const {
      entityName,
      executionConfigName,
      mutationDirectives = '',
      ids,
      noCache,
    } = params;
    let { configName } = params;
    configName = configName || MetadataSupport.getConfigName(entityName);

    const unqualifiedEntityName =
      MetadataSupport.getUnqualifiedName(entityName);

    if (!ids) {
      throw new Error('ids not specified');
    }

    const variables = {
      ids: ids.map((id) => ({ id })),
    };

    const mutation = `mutation batchDeleteRecords ($ids: [IdTypeInput]!) ${mutationDirectives} {
      batchDelete${unqualifiedEntityName} (input: $ids) {
        id
      }
    }`;

    const result = await this.executeGraphqlMutation({
      mutation,
      configName,
      executionConfigName,
      variables,
      noCache,
    });

    return result;
  }

  public async deleteAllRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    filter?: string;
    queryDirectives?: string;
    queryArguments?: { [key: string]: SimpleType };
    extraFieldsForFilter?: string;
  }) {
    const {
      executionConfigName,
      filter,
      queryDirectives,
      queryArguments,
      extraFieldsForFilter,
    } = params;

    let { configName, entityName } = params;
    if (!configName || MetadataSupport.isQualifiedName(entityName)) {
      configName = MetadataSupport.getConfigName(entityName);
      entityName = MetadataSupport.getUnqualifiedName(entityName);
    }
    if (!configName) {
      throw new Error(
        'The configName must be specified as a parameter or part of the entityName',
      );
    }

    const { argumentsString } = this.makeArgumentsString({
      entityName,
      configName,
      queryArguments,
    });

    let notEmpty = true;
    let connectionResult: IQueryConnectionResult;
    let mostRecentDeleteTime;
    while (notEmpty) {
      const queryDirectivesString = queryDirectives ? queryDirectives : '';
      const filterString = filter ? `@filter(expr: "${filter}")` : '';
      const extraFieldsString = extraFieldsForFilter
        ? extraFieldsForFilter
        : '';
      mostRecentDeleteTime = Date.now();
      const waitNotifyToken = this.clientManager.waitNotify.getWaitToken();
      connectionResult = await this.executeGraphqlQuery({
        query: `query list${entityName} ${queryDirectivesString} @${DIRECTIVE_IGNORE_SIZECLASS}    
          @outputpipeline(name: "system:deleteRecordsPart2",
            arguments: { recordType: ${entityName} waitNotifyToken: "${waitNotifyToken}" }
            logoutput: true
            aggregatepipeline: "system:deleteRecordsAggregate" )
           { list${entityName}${argumentsString} ${filterString} @${DIRECTIVE_ACTIVEASOF}
            { items { id ${extraFieldsString} } }
        }`,
        executionConfigName,
        returnConnectionResult: true,
        noCache: true,
      });

      await this.clientManager.waitNotify.wait(waitNotifyToken);

      while (notEmpty) {
        notEmpty = await this.waitForRecords(0, {
          entityName,
          configName,
          queryArguments,
          fieldDirectives: `@${DIRECTIVE_ACTIVEASOF}`,
        });
        if (filter) {
          break;
        }

        // In some situations the record might have come back or otherwise been missed
        // the first time with the query, so try it again. See WORM-2586
        // Don't retry the delete any more frequently than 20 seconds
        if (notEmpty && Date.now() - mostRecentDeleteTime > 20000) {
          logger.info(
            'Retrying delete query, as waiting for records was not empty',
          );
          break;
        }
      }
      if (filter) {
        break;
      }
    }

    // We have to do this when a filter is used because the count ignores the filter
    // So then we see if all of the pipelines are done. Note that we can't check that earlier,
    // As the output pipelines are started dynamically after the query, so if we wait for
    // all pipelines based on the requestId, they may not all be present.
    if (filter && notEmpty) {
      const status = await this.waitForRemoteAsyncPipeline({
        requestId: connectionResult.requestId,
      });
      logger.debug('Deleting - pipeline executions: ');
      status.forEach((s) => logger.debug(PipelineManager.statusToString(s)));
    }
  }

  public async countRecords(params: {
    entityName: string;
    configName?: string;
    executionConfigName?: string;
    queryArguments?: { [key: string]: SimpleType };
    fieldDirectives?: string;
    noCache?: boolean;
  }): Promise<number> {
    const { queryArguments, fieldDirectives = '', noCache } = params;
    let { entityName, configName } = params;
    if (!configName) {
      configName = MetadataSupport.getConfigName(entityName);
      if (configName) {
        entityName = MetadataSupport.getUnqualifiedName(entityName);
      }
    }
    const { argumentsString } = this.makeArgumentsString({
      entityName,
      configName,
      queryArguments,
    });

    const result = await this.executeGraphqlQuery({
      query: `query @${DIRECTIVE_IGNORE_SIZECLASS} { list${entityName}${argumentsString} ${fieldDirectives} { count } }`,
      noCache,
    });
    return result.count;
  }

  private async deactivateActivateRecords(params: {
    entityName: string;
    configName?: string;
    ids: { id: string }[];
    deactivationDate?: string;
  }): Promise<void> {
    const { ids, deactivationDate = getLocalIsoDate() } = params;

    const { entity, typeDefinition } = this.makeArgumentsString(params);
    const updateRecords = ids.map((idEntry) => ({
      id: idEntry.id,
      [DEACTIVATION_DATE]: deactivationDate,
    }));

    await this.executeGraphqlMutation({
      mutation: `mutation ($updateRecords: [${typeDefinition.getUnqualifiedId()}Input]!) @dolog { batchUpdate${
        entity.unqualifiedId
      } (input: $updateRecords) { id } }`,
      variables: { updateRecords },
    });
  }

  public async deactivateRecords(params: {
    entityName: string;
    configName?: string;
    ids: { id: string }[];
    /**
     * YYYY-MM-DD
     *
     * @default: today's date in local timezone
     */
    deactivationDate?: string;
  }): Promise<void> {
    await this.deactivateActivateRecords(params);
  }

  public async activateRecords(params: {
    entityName: string;
    configName?: string;
    ids: { id: string }[];
  }): Promise<void> {
    await this.deactivateActivateRecords({ ...params, deactivationDate: null });
  }

  // Used when an entity has changed size class, or possibly sort keys
  public async reindexRecords(entityName: string, configName?: string) {
    const promises = [];
    let readCount = 0;
    let writeCount = 0;
    await this.listRecords({
      entityName,
      configName,
      queryDirectives: '@paged',
      pageCallback: async (records: any[]) => {
        logger.debug(`Processing: ${records.length} records`);
        records.forEach((record) =>
          this.clientManager.metadataSupport.unResolveAssociatedEntities({
            record,
            configName,
          }),
        );
        readCount += records.length;

        promises.push(
          this.createRecords({
            entityName,
            records,
            mutationDirectives: '@overwrite @suppresslog @suppresspipeline',
            configName,
          }).then(() => {
            logger.info(`Rewrote ${records.length} records`);
            writeCount += records.length;
          }),
        );
      },
    });

    await Promise.all(promises);
    logger.info(`Reindexing complete: Read: ${readCount} Wrote: ${writeCount}`);
  }

  public async concurrentRecordProcessor(params: {
    processor: (chunkValue: any) => Promise<any>;
    concurrencyField: string;
    concurrencyOverride?: number;
    /**
     * Identifies each chunk and provides the value to be passed to the
     * processor to handle the chunk.
     */
    chunkMap: { [chunkId: string]: any };
    totalRecordCount?: number;
    recordSize?: number;
    recordsPerChunk?: number;
    suppressErrorLogging?: boolean;
    errorCallback?: (params: {
      chunkId: string;
      chunkValue: any;
      error: Error;
    }) => void;
  }) {
    const {
      processor,
      concurrencyField,
      chunkMap,
      totalRecordCount = 0,
      recordSize = 0,
      recordsPerChunk = 0,
      errorCallback,
      concurrencyOverride,
      suppressErrorLogging,
    } = params;

    const initialConcurrency =
      concurrencyOverride ||
      this.clientManager.getStackConfig()[concurrencyField];
    const chunkCount = Object.keys(chunkMap).length;
    const startTime = Date.now();
    const pendingChunks = {};

    await performanceTimes({
      logger,
      recordCount: totalRecordCount,
      recordSize,
      suppressErrorLogging,
      func: async () => {
        const queue = new PQueue({
          concurrency: initialConcurrency,
        });

        const startTimes = {};

        let done = 0;
        let rps;
        let bps;
        let cps;
        // let cps5min;
        for (const chunkId in chunkMap) {
          const chunkValue = chunkMap[chunkId];
          queue
            .add(async () => {
              logger.info(`Start chunk #${chunkId}`);
              startTimes[chunkId] = Date.now();
              if (!exists(concurrencyOverride)) {
                const stackConfig =
                  await this.clientManager.getStackConfigWithRefresh();
                if (queue.concurrency !== stackConfig[concurrencyField]) {
                  queue.concurrency = stackConfig[concurrencyField];
                  logger.info(`Concurrency changes: ${queue.concurrency}`);
                }
              }
              pendingChunks[chunkId] = chunkId;
              await processor(chunkValue);
              delete pendingChunks[chunkId];
              done =
                chunkCount -
                // The +1 is because this one is not finished in the queue.size + queue.pending
                (queue.size + queue.pending) +
                1;

              const elapsedTimeSec = (Date.now() - startTime) / 1000;
              const totalBytes = done * recordsPerChunk * recordSize;

              cps = done / elapsedTimeSec;
              rps = (done * recordsPerChunk) / elapsedTimeSec;
              bps = totalBytes / elapsedTimeSec;
              logger.info(
                `End   chunk #${chunkId} ${
                  Date.now() - startTimes[chunkId]
                }ms - Remaining queue size: ${
                  queue.size
                } (done: ${done}) CPS: ${Math.round(cps)} RPS: ${Math.round(
                  rps,
                )} BPS: ${Math.round(bps)} Concurrency: ${
                  queue.concurrency
                } Pending: ${
                  Object.keys(pendingChunks).join(',') /*queue.pending*/
                } `,
              );
            })
            .then()
            .catch((error) => {
              if (!suppressErrorLogging) {
                reThrow({
                  message: 'Problem processing chunk',
                  error,
                  logger,
                  noThrow: true,
                  logObject: { chunkValue, chunkId },
                });
              }
              if (errorCallback) {
                errorCallback({
                  chunkId,
                  chunkValue,
                  error,
                });
              }
            });
        }

        logger.info('Waiting for processing to finish');
        await queue.onIdle();
        logger.info('Finished with queue');
      },
    });
  }

  // This is used to wait for a given number of records to be present. This is done for two
  // reasons:
  // 1) When deleting all records, we want to cheaply wait until the records are gone.
  // 2) When creating records for tests, we want to wait until the records are all present. Because
  //    DynamoDB is eventually consistent on read, this may take a few seconds.
  // In either case, a filter might be involved. Since PipelineManager.countRecords cannot use
  // a filter (due to the way DynamoDB works), we can't know for sure if the found record count is
  // finally right. So we just stop when the found record count is not changing. And then
  // the caller can deal with additional checks, if required.
  public async waitForRecords(
    recordCount: number,
    countRecordsArgs,
  ): Promise<boolean> {
    const startTime = Date.now();
    const argsString = stringify({
      input: countRecordsArgs,
    });

    let prevCount;
    let prevMatchTimes = 0;
    let notEmpty;

    const waitForRecordsFunction = waitFor(async () => {
      const foundCount = await this.countRecords(countRecordsArgs);

      logger.info(
        `Waiting for ${recordCount} records ${argsString} - found: ${foundCount}`,
      );
      if (prevCount === foundCount) {
        prevMatchTimes++;
        if (prevMatchTimes > 3) {
          logger.info(
            `Waiting for records - matched too many times - assuming stable (maybe ok because of filter): ${foundCount}`,
          );
          notEmpty = true;
          return true;
        }
      }
      prevCount = foundCount;
      return foundCount === recordCount;
    }, 2000);

    await waitForRecordsFunction();

    logger.info(
      `Wait for ${recordCount} records done: ${argsString} in ${
        Date.now() - startTime
      }ms`,
    );
    return notEmpty;
  }
}
