import _ from 'lodash';
import { v1 as uuid } from 'uuid';

import { getLogger, Loggers } from '../loggerSupport';
import {
  CONFIG_SEPARATOR,
  IEntityType,
  SYSTEM_DATA_OPERATION_ACTION_INSTANCE,
  SYSTEM_DATA_OPERATION_CHUNK,
  SYSTEM_DATA_OPERATION_UNIT,
} from '../metadataSupportConstants';
import {
  getUtcIsoTimestamp,
  sequentialArray,
  sortStrings,
} from '../utilityFunctions';

import { ClientManager } from 'universal/clientManager';
import { PipelineManager } from 'universal/pipeline/pipelineManager';
import { SizeClass } from 'universal/sizeClass';
import { BasicType, DateTime, SimpleType } from 'universal/types';

const logger = getLogger({ name: Loggers.DATA_OPERATION });

export interface IValueChunks {
  id: string;
  fields: string[];
  values: string[];
  timestamp: string;
}

export interface IIncorrectType {
  field: string;
  type: BasicType;
  // The first 10 values with this type
  values: SimpleType[];
  count: number;
}

// Used to identify a type of action, a particular IngestConfigSet, Transform, or EntityType (for restore)
// This is stored in the database as a string with the format <entityTypeId>:<entityId>
export interface IDataOperationActionId {
  // Fully qualified
  entityTypeId: string;
  // Fully qualified
  entityId: string;
}

// The execution of a data operation action, corresponds to DataOperationAction
export interface IDataOperationActionInstance {
  // UUID
  id?: string;
  actionId?: IDataOperationActionId;
  configurationObject?: any;
  startDateTime?: DateTime;
  endDateTime?: DateTime;
  completed?: boolean;
}

// This corresponds to DataOperationUnit
export interface IDataOperationUnit {
  // actionId + _ + s3FilePath
  id?: string;
  // <configName>:<actionEntityType>:<actionEntityId>
  actionId?: IDataOperationActionId;
  // ID for IDataOperationAction
  creationActionInstance?: IDataOperationActionInstance;
  // ID for IDataOperationAction
  processingActionInstance?: IDataOperationActionInstance;
  entityType?: IEntityType;
  // the action's configuration object, e.g. IngestConfig
  configurationObject?: any;

  // Path relative to the bucket
  s3FilePath?: string;
  splitKey?: string;
  fileComponent?: string;
  processed?: boolean;
  verified?: boolean;

  numberOfChunks?: number;
  numberOfRecords?: number;

  chunkSize?: number;

  startDateTime?: DateTime;
  processedDateTime?: DateTime;
  verifiedDateTime?: DateTime;

  // An error that happened when the processing starts
  initialProcessingError?: string;

  referencedUnits?: IDataOperationUnit[];

  // FIXME - Below is kept for the current usage by Transform, will likely be changed with refactor of that
  requestId?: string;
  chunkType?: ChunkType;
  chunkField?: string;
  sizeClass?: SizeClass;
}

export interface IDataOperationChunk {
  // DataOperationUnitId _ splitKey _ chunkId
  id?: string;
  actionId?: IDataOperationActionId;
  actionInstance?: IDataOperationActionInstance;
  unit?: IDataOperationUnit;

  chunkNumber?: number;

  splitKey?: string;
  statisticsValues?: IDataOperationStatisticsValue[];

  // REMOVEME - this is used by transform and we will probably remove that functionality
  // from transform
  chunkId?: string;

  expectedNumberOfRecords?: number;
  expectedNumberOfBytes?: number;
  actualNumberOfRecords?: number;
  processed?: boolean;
  verified?: boolean;
  incorrectTypes?: IIncorrectType[];
  rejectedNumberOfRecords?: number;
  duplicateRecords?: any[];
  processingError?: string;
  errorRetryable?: boolean;

  // The uuid for this chunk that goes with each record when added to the database
  uuid?: string;
}

export enum ChunkType {
  VALUE_CHUNKS = 'VALUE_CHUNKS',
  FIELD_CHUNKS = 'FIELD_CHUNKS',
}

export interface IDataOperationStatisticsSpecification {
  matchKeys?: string[];
  aggregateAttributes?: string[];
}

export interface IDataOperationStatisticsValue {
  matchKeys?: string[];
  aggregateAttributes?: { [attributeName: string]: number };
  recordCount?: number;
}

export interface IDataOperationStatusReturn {
  actionInstances?: Array<IDataOperationActionInstance>;
  units?: Array<IDataOperationUnit & { chunks: Array<IDataOperationChunk> }>;
  incompleteUnits?: Array<IDataOperationUnit>;
  errorUnits?: Array<IDataOperationUnit>;
  completeUnits?: Array<IDataOperationUnit>;
  verifiedUnits?: Array<IDataOperationUnit>;
  errorChunks: { [id: string]: IDataOperationChunk };
  incompleteChunks: { [id: string]: IDataOperationChunk };
  completeChunks: { [id: string]: IDataOperationChunk };
  incorrectTypeChunks: { [id: string]: IDataOperationChunk };
  incorrectTypes: IIncorrectType[];
}

export class DataOperation {
  private clientManager: ClientManager;
  private pipelineManager: PipelineManager;
  private sizeClass;

  public initialize(clientManager: ClientManager) {
    this.clientManager = clientManager;
    this.pipelineManager = clientManager.pipelineManager;
    this.sizeClass = clientManager.sizeClass;
  }

  public makeActionIdString(actionId: IDataOperationActionId): string {
    return `${actionId.entityTypeId}${CONFIG_SEPARATOR}${actionId.entityId}`;
  }

  public makeDataOperationUnitKey(params: {
    actionId?: IDataOperationActionId;
    filePath?: string;
    fileComponent?: string;
  }): string {
    const { actionId, filePath, fileComponent } = params;
    const baseString =
      this.makeActionIdString(actionId) + (filePath ? '_' + filePath : '');
    if (fileComponent) {
      return baseString + '_' + fileComponent;
    }
    return baseString;
  }

  public async getAllActionInstances(
    actionId: IDataOperationActionId,
    actionInstanceId?: string,
  ): Promise<IDataOperationActionInstance[]> {
    const actionIdString = this.makeActionIdString(actionId);

    let actionInstanceRecords = (
      await this.pipelineManager.listRecords({
        entityName: SYSTEM_DATA_OPERATION_ACTION_INSTANCE,
        queryArguments: {
          actionId: actionIdString,
        },
        deep: true,
      })
    ).items as IDataOperationActionInstance[];
    if (actionInstanceId) {
      actionInstanceRecords = actionInstanceRecords.filter(
        (ar) => ar.id === actionInstanceId,
      );
    }
    return actionInstanceRecords;
  }

  public async getUnit(id: string): Promise<IDataOperationUnit> {
    return this.pipelineManager.getRecord({
      entityName: SYSTEM_DATA_OPERATION_UNIT,
      id,
      deep: true,
    });
  }

  public async getAllUnits(
    actionId: IDataOperationActionId,
    actionInstanceId?: string,
  ): Promise<IDataOperationUnit[]> {
    const actionIdString = this.makeActionIdString(actionId);

    let unitRecords: IDataOperationUnit[] = (
      await this.pipelineManager.listRecords({
        entityName: SYSTEM_DATA_OPERATION_UNIT,
        queryArguments: {
          actionId: actionIdString,
        },
        deep: true,
      })
    ).items;

    if (actionInstanceId) {
      unitRecords = unitRecords.filter(
        (u) =>
          u.creationActionInstance?.id === actionInstanceId ||
          u.processingActionInstance?.id === actionInstanceId,
      );
    }

    for (const unit of unitRecords) {
      const unitAny = unit as any;
      if (unitAny.entityTypeId) {
        // It's possible that the entity type was deleted after the ingest (this is done in the tests)
        unit.entityType = this.clientManager.schemaManager.getEntityType({
          name: unitAny.entityTypeId,
          noThrow: true,
        });
      }
    }

    return unitRecords;
  }

  public async getAllChunksByActionId(
    actionId: IDataOperationActionId,
    actionInstanceId?: string,
  ): Promise<IDataOperationChunk[]> {
    const actionIdString = this.makeActionIdString(actionId);

    let allChunks = (
      await this.pipelineManager.listRecords({
        entityName: SYSTEM_DATA_OPERATION_CHUNK,
        queryArguments: { actionId: actionIdString },
      })
    ).items as IDataOperationChunk[];

    if (actionInstanceId) {
      allChunks = allChunks.filter(
        (c) => c.actionInstance.id === actionInstanceId,
      );
    }

    return allChunks;
  }

  public async getAllChunksByUnit(
    unitId: string,
  ): Promise<IDataOperationChunk[]> {
    const allChunks = (
      await this.pipelineManager.listRecords({
        entityName: SYSTEM_DATA_OPERATION_CHUNK,
        queryArguments: { unit: unitId },
      })
    ).items as IDataOperationChunk[];

    return allChunks;
  }

  public async getExistingChunksByUnit(params: {
    unitId: string;
    isProcessed?: boolean;
  }): Promise<IDataOperationChunk[]> {
    const { unitId, isProcessed } = params;
    const listRecordsResult = await this.pipelineManager.listRecords({
      entityName: SYSTEM_DATA_OPERATION_CHUNK,
      queryArguments: {
        unit: unitId,
      },
    });

    let existingChunks: IDataOperationChunk[] = listRecordsResult.items;

    if (isProcessed !== undefined) {
      existingChunks = existingChunks.filter(
        ({ processed }) => processed === isProcessed,
      );
    }

    return existingChunks;
  }

  public async getIncompleteChunkIds(params: {
    unitId: string;
    sizeClass?: SizeClass;
    valueChunksRecord?: IValueChunks;
  }): Promise<string[]> {
    const { unitId, sizeClass, valueChunksRecord } = params;

    if (!sizeClass && !valueChunksRecord) {
      throw new Error('Expected either sizeClass or valueChunksRecord');
    }

    const allChunkIds = sizeClass
      ? this.getChunkIds({ sizeClass })
      : valueChunksRecord.values;

    const completeChunkIds = (
      await this.getExistingChunksByUnit({
        unitId,
      })
    ).map((c) => c.chunkId);

    const incompleteChunkIds = _.difference(allChunkIds, completeChunkIds);

    return sortStrings({
      array: incompleteChunkIds,
      numbers: !!sizeClass,
    });
  }

  public getChunkIds({ sizeClass }: { sizeClass: SizeClass }): string[] {
    const { numberOfBuckets } = this.sizeClass.sizeClassProperties[sizeClass];

    return sequentialArray(numberOfBuckets).map((number) => number.toString());
  }

  public async createActionInstance(
    params: Omit<IDataOperationActionInstance, 'id'>,
  ): Promise<IDataOperationActionInstance> {
    const id = uuid();
    const record = {
      id,
      startDateTime: getUtcIsoTimestamp(),
      ...params,
    };
    await this.updateActionInstance(record);
    return record;
  }

  public async updateActionInstance(params: IDataOperationActionInstance) {
    await this.pipelineManager.upsertRecord({
      entityName: SYSTEM_DATA_OPERATION_ACTION_INSTANCE,
      mutationDirectives: '@suppresslog',
      record: {
        ...params,
        actionId: this.makeActionIdString(params.actionId),
      },
    });
  }

  public async createUnit(
    params: Omit<IDataOperationUnit, 'id'>,
  ): Promise<IDataOperationUnit> {
    return this.updateUnit({ startDateTime: getUtcIsoTimestamp(), ...params });
  }

  public async updateUnit(
    params: IDataOperationUnit,
  ): Promise<IDataOperationUnit> {
    const actionIdString = this.makeActionIdString(params.actionId);
    const id = this.makeDataOperationUnitKey({
      ...params,
      filePath: params.s3FilePath?.split('/').slice(1).join('/'),
    });
    const record = {
      ...params,
      actionId: actionIdString,
      id,
      creationActionInstance: params.creationActionInstance?.id,
      processingActionInstance: params.processingActionInstance?.id,
      entityTypeId: params.entityType?.id,
    };
    delete record.entityType;
    await this.pipelineManager.upsertRecord({
      entityName: SYSTEM_DATA_OPERATION_UNIT,
      mutationDirectives: '@suppresslog',
      record,
    });

    return { ...params, id };
  }

  public async deleteUnits(ids: string[]) {
    await this.pipelineManager.deleteRecords({
      ids,
      entityName: SYSTEM_DATA_OPERATION_UNIT,
    });
  }

  public async deleteChunks(ids: string[]) {
    await this.pipelineManager.deleteRecords({
      ids,
      entityName: SYSTEM_DATA_OPERATION_CHUNK,
    });
  }

  public makeChunkIdFromChunk(chunk: IDataOperationChunk): string {
    return (
      chunk.unit.id +
      '_' +
      chunk.actionInstance.id +
      '_' +
      (chunk.splitKey ? chunk.splitKey + '_' : '') +
      chunk.chunkNumber
    );
  }

  public async upsertChunk(
    chunk: Omit<IDataOperationChunk, 'id'>,
  ): Promise<void> {
    const actionIdString = this.makeActionIdString(chunk.actionId);
    const id = this.makeChunkIdFromChunk(chunk);

    const record = {
      ...chunk,
      actionId: actionIdString,
      id,
      actionInstance: chunk.actionInstance.id,
      unit: chunk.unit.id,
    };

    if (record.processed) {
      delete record.processingError;
      delete record.errorRetryable;
    }

    await this.pipelineManager.upsertRecord({
      entityName: SYSTEM_DATA_OPERATION_CHUNK,
      mutationDirectives: '@suppresslog',
      record,
    });
  }

  public async createChunks(chunks: IDataOperationChunk[]): Promise<void> {
    const actionIdString = this.makeActionIdString(chunks[0].actionId);
    const chunksToWrite = chunks.map((c) => {
      const chunk = {
        ...c,
        actionId: actionIdString,
        actionInstance: c.actionInstance.id,
        unit: c.unit.id,
      };
      chunk.id = this.makeChunkIdFromChunk(c);
      return chunk;
    });

    await this.pipelineManager.createRecords({
      entityName: SYSTEM_DATA_OPERATION_CHUNK,
      mutationDirectives: '@suppresslog',
      records: chunksToWrite,
    });
  }

  public async removeDataOperationRecordsForAction(
    actionId: IDataOperationActionId,
  ) {
    const unitRecords = await this.getAllUnits(actionId);
    const ids = _.uniq(unitRecords.map((ir) => ir.id));
    await this.deleteUnits(ids);
    await this.deleteAllChunks(actionId);
  }

  public async deleteAllChunks(actionId: IDataOperationActionId) {
    const allChunks = await this.getAllChunksByActionId(actionId);
    const ids = _.uniq(allChunks.map((ir) => ir.id));
    await this.deleteChunks(ids);
  }

  public async getDataOperationRecordsForAction(params: {
    actionId: IDataOperationActionId;
    actionInstanceId?: string;
    // FIXME - this is not presently implemented
    incompleteOnly?: boolean;
  }): Promise<IDataOperationStatusReturn> {
    const { actionId, actionInstanceId } = params;

    const actionIdString = this.makeActionIdString(actionId);

    logger.info(`Getting records for ${actionIdString}`);

    const actionInstances = await this.getAllActionInstances(
      actionId,
      actionInstanceId,
    );

    const unitsMap: {
      [unitId: string]: IDataOperationUnit & {
        chunks?: IDataOperationChunk[];
        chunkError?: boolean;
      };
    } = {};
    const unitRecords = await this.getAllUnits(actionId, actionInstanceId);

    const allChunks = await this.getAllChunksByActionId(
      actionId,
      actionInstanceId,
    );
    const allChunksMap: { [chunkId: string]: IDataOperationChunk } = {};
    allChunks.forEach((c) => (allChunksMap[c.id] = c));

    logger.info(`Getting records for ${actionIdString} - DONE`);

    const errorChunks: { [id: string]: IDataOperationChunk } = {};
    const incompleteChunks: { [id: string]: IDataOperationChunk } = {};
    const completeChunks: { [id: string]: IDataOperationChunk } = {};
    const incorrectTypeChunks: { [id: string]: IDataOperationChunk } = {};

    const incorrectTypeMap: {
      [key: string]: IIncorrectType & { ingestInstanceId: string };
    } = {};

    for (const unit of unitRecords) {
      logger.info(`Getting ingest instance chunk records for ${unit.id}`);
      unitsMap[unit.id] = unit;

      const unitChunks: { [id: string]: IDataOperationChunk } = {};
      allChunks
        .filter((c) => c.unit.id === unit.id)
        .forEach((c) => (unitChunks[c.id] = c));

      let anyIncompleteChunks;
      let anyErrorChunks;

      Object.values(unitChunks)
        .filter(
          (c) =>
            c.processed &&
            c.verified &&
            c.expectedNumberOfRecords === c.actualNumberOfRecords,
        )
        .forEach((c) => (completeChunks[c.id] = c));

      Object.values(unitChunks)
        .filter(
          (c) =>
            !c.processed ||
            !c.verified ||
            c.expectedNumberOfRecords !== c.actualNumberOfRecords,
        )
        .forEach((c) => {
          incompleteChunks[c.id] = c;
          if (!c.processed) {
            anyIncompleteChunks = true;
          }
        });

      Object.values(unitChunks)
        .filter(
          (c) =>
            c.processingError &&
            (!c.processed || !c.verified) &&
            // FIXME - use the retry flag instead but that does not work
            !c.processingError.includes('ProvisionedThroughputExceeded'),
        )
        .forEach((c) => {
          errorChunks[c.id] = c;
          anyErrorChunks = true;
        });

      Object.values(unitChunks)
        .filter((c) => c.incorrectTypes && c.incorrectTypes.length > 0)
        .forEach((c) => {
          incorrectTypeChunks[c.id] = c;
          c.incorrectTypes.forEach((it) => {
            const key = unit.id + it.field;
            const incorrectType = incorrectTypeMap[key];
            if (!incorrectType) {
              incorrectTypeMap[key] = {
                ingestInstanceId: unit.id,
                ...it,
              };
            } else {
              incorrectTypeMap[key].count += it.count;
              incorrectTypeMap[key].values = incorrectTypeMap[
                key
              ].values.concat(it.values);
            }
          });
        });

      if (unit.processed && anyIncompleteChunks) {
        logger.error(
          `Incomplete chunks found for ingest instance marked processed: ${unit.id} - marking instance not processed`,
        );

        await this.clientManager.pipelineManager.updateRecord({
          entityName: SYSTEM_DATA_OPERATION_UNIT,
          record: {
            id: unit.id,
            processed: false,
          },
        });
        unit.processed = false;
      }
      unitsMap[unit.id].chunks = Object.values(unitChunks);
      unitsMap[unit.id].chunkError = anyErrorChunks;
    }

    const incompleteIngestInstances = unitRecords.filter((i) => !i.processed);
    const completeIngestInstances = unitRecords.filter((i) => i.processed);
    const verifiedIngestInstances = unitRecords.filter((i) => i.verified);
    const errorIngestInstances = unitRecords.filter(
      (i) =>
        !i.processed && (i.initialProcessingError || (i as any).chunkError),
    );

    const ingestInstances = [];
    Object.values(unitsMap).forEach((i) => ingestInstances.push(i));

    return {
      actionInstances,
      units: ingestInstances,
      errorChunks,
      incompleteUnits: incompleteIngestInstances,
      errorUnits: errorIngestInstances,
      completeUnits: completeIngestInstances,
      verifiedUnits: verifiedIngestInstances,
      incorrectTypeChunks,
      incompleteChunks,
      completeChunks,
      incorrectTypes: Object.values(incorrectTypeMap),
    };
  }
}
