/*!
 *
 * Based on some code with this copyright:
 * Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of
 * the License is located at
 *     http://aws.amazon.com/asl/
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
 */
// Remainder of code Copyright 2019, Snapstrat, Inc

import {
  ApolloError,
  ApolloLink,
  FetchResult,
  Observable,
  Operation,
} from '@apollo/client/core/index.js';
import { getMainDefinition } from '@apollo/client/utilities';
import { FieldNode } from 'graphql';
import { ZenObservable } from 'zen-observable-ts';

import {
  IMqttClient,
  IMqttProvider,
  ISubHandler,
} from 'universal/mqttProvider';
import { ClientManager } from '../clientManager';
import { reThrow } from '../errors/errorLog';
import { LogLevels, Loggers, getLogger } from '../loggerSupport';

const logger = getLogger({
  name: Loggers.GRAPHQL_LINKS,
});
const logSubscriptions = getLogger({
  name: Loggers.GRAPHQL_SUBSCRIPTIONS,
  level: LogLevels.Warn,
});

interface ISubscriptionExtension {
  newSubscription: string;
}

interface IClient {
  client: IMqttClient;
  topics: Set<string>;
}

interface ITopicObservers {
  clientId: string;
  observers: Set<ZenObservable.Observer<any>>;
}

enum ActionTypes {
  MESSAGE,
  SUBSCRIBE_NEW_TOPICS,
}

interface IAction {
  actionType: ActionTypes;
  params: any;
}

const MAX_TOPICS_PER_CONNECTION = 40;

export const CONTROL_EVENTS_KEY = '@@controlEvents';
export const PERMANENT_ERROR_KEY =
  typeof Symbol !== 'undefined' ? Symbol('permanentError') : '@@permanentError';

export class SubscriptionHandshakeLink
  extends ApolloLink
  implements ISubHandler
{
  private readonly mqttProvider: IMqttProvider;

  public readonly clientManager: ClientManager;
  public readonly mqttHost: string;

  private readonly subsInfoContextKey: string;
  private readonly clientIdRoot: string;

  // K(topic)
  private topicObservers: Map<string, ITopicObservers> = new Map();

  // K(clientId)
  private clients: Map<string, IClient> = new Map();

  private clientIdSuffix = 0;

  // Connections or subscriptions waiting for reply
  // noinspection JSMismatchedCollectionQueryUpdate - the inspection is not correct in this case
  private deferringReasonsSet: Set<string>;

  // Queue of actions (request, message) waiting for processing
  private readonly deferredActionQueue: IAction[];

  constructor(params: {
    clientManager: ClientManager;
    mqttProvider: IMqttProvider;
    subsInfoContextKey;
    clientIdRoot: string;
    mqttEndpoint: string;
  }) {
    super();
    const {
      clientManager,
      mqttProvider,
      subsInfoContextKey,
      clientIdRoot,
      mqttEndpoint,
    } = params;

    this.clientManager = clientManager;
    this.mqttProvider = mqttProvider;
    this.subsInfoContextKey = subsInfoContextKey;
    this.clientIdRoot = clientIdRoot;
    this.mqttHost = mqttEndpoint;
    this.deferredActionQueue = [];
    this.deferringReasonsSet = new Set();
  }

  public async shutdown() {
    logger.debug('Shutdown');
    for (const client of this.clients.values()) {
      await client.client.end();
    }
  }

  // Returns true if OK to execute the action now
  private checkAction(actionType: ActionTypes, params: any) {
    if (this.deferringReasonsSet.size === 0) {
      return true;
    }
    this.deferAction(actionType, params);
    return false;
  }

  private deferAction(actionType: ActionTypes, params: any) {
    this.deferredActionQueue.push({ actionType, params });
  }

  private startDeferringReason(token: string) {
    this.deferringReasonsSet.add(token);
  }

  private finishDeferringReason(token: string) {
    this.deferringReasonsSet.delete(token);
    while (this.deferringReasonsSet.size === 0) {
      if (this.deferredActionQueue.length > 0) {
        const a = this.deferredActionQueue[0];
        switch (a.actionType) {
          case ActionTypes.MESSAGE:
            this.onMessage(
              a.params.topic,
              a.params.message,
              a.params.selectionNames,
            );
            break;
          case ActionTypes.SUBSCRIBE_NEW_TOPICS:
            this.subscribeNewTopics(
              a.params.newTopics,
              a.params.observer,
              a.params.operation,
            );
            break;
        }
        this.deferredActionQueue.splice(0, 1);
      } else {
        break;
      }
    }
  }

  public request(operation: Operation) {
    const { [this.subsInfoContextKey]: subsInfo } = operation.getContext();

    const {
      extensions: { subscription: { newSubscription } } = {
        subscription: { newSubscription: null },
      },
      errors = [],
    }: {
      extensions?: {
        subscription: ISubscriptionExtension;
      };
      errors: any[];
    } = subsInfo;

    logger.debug({ subsInfo, newSubscription, errors }, 'Subscription request');
    if (errors && errors.length) {
      return new Observable((observer) => {
        observer.error(
          new ApolloError({
            errorMessage: 'Error during subscription handshake',
            extraInfo: { errors },
            graphQLErrors: errors,
          }),
        );

        return () => {
          // nothing
        };
      });
    }

    const newSubscriptionTopics = [newSubscription];
    newSubscriptionTopics.forEach((t) => {
      if (!t) {
        logger.error(`null topic: ${subsInfo}`);
        throw new Error(`null topic: ${subsInfo}`);
      }
    });
    const existingTopicsWithObserver = new Set(
      newSubscriptionTopics.filter((t) => this.topicObservers.has(t)),
    );
    logger.debug(Array.from(existingTopicsWithObserver), 'Existing topics');
    const newTopics = new Set(
      newSubscriptionTopics.filter((t) => !existingTopicsWithObserver.has(t)),
    );
    logger.debug(Array.from(newTopics), 'newTopics');

    return new Observable<FetchResult>((observer) => {
      existingTopicsWithObserver.forEach((t) => {
        this.topicObservers.get(t).observers.add(observer);
      });
      this.subscribeNewTopics(newTopics, observer, operation);

      return () => {
        logger.debug(observer, 'Observer is ending');
        this.topicObservers.forEach((to) => to.observers.delete(observer));

        const finishedTopics = Array.from(this.topicObservers).filter(
          ([, to]) => to.observers.size === 0,
        );
        finishedTopics.forEach(([topic, to]) => {
          const client = this.clients.get(to.clientId);
          if (client) {
            client.topics.delete(topic);
          }
        });

        this.topicObservers = new Map(
          Array.from(this.topicObservers).filter(
            ([, to]) => to.observers.size > 0,
          ),
        );

        // Note, we just leave the clients around, as they will likely be
        // filled up again with topics
      };
    });
  }

  private subscribeNewTopics(
    newTopics: Set<string>,
    observer: ZenObservable.Observer<FetchResult>,
    operation: Operation,
  ) {
    if (
      !this.checkAction(ActionTypes.SUBSCRIBE_NEW_TOPICS, {
        newTopics,
        observer,
        operation,
      })
    ) {
      return;
    }

    const { query } = operation;
    const selectionNames = (
      getMainDefinition(query).selectionSet.selections as FieldNode[]
    ).map(({ name: { value } }) => value);

    let newTopicsArray = Array.from(newTopics);
    logger.debug(newTopicsArray, 'subscribeNewTopics');

    // Hook up to any clients that have room
    for (const key of this.clients.keys()) {
      const o = this.clients.get(key);
      const availableTopics = MAX_TOPICS_PER_CONNECTION - o.topics.size;
      const thisClientTopics = newTopicsArray.slice(0, availableTopics);
      newTopicsArray = newTopicsArray.slice(availableTopics);
      this.subscribeToTopics(key, o.client, thisClientTopics, observer);
    }

    // Still more, need a new connection and try again
    if (newTopicsArray.length > 0) {
      const clientId = `${this.clientIdRoot}_${this.clientIdSuffix}`;
      this.clientIdSuffix++;
      logger.debug('Connecting new subscription client - started');
      this.connectNewClient(clientId, selectionNames);
      // Redrive the action once the connection is done
      this.deferAction(ActionTypes.SUBSCRIBE_NEW_TOPICS, {
        newTopics,
        observer,
        operation,
      });
    }
  }

  public onClosed(clientId: string) {
    this.onConnectionLost(clientId);
  }

  public onOffline(clientId: string) {
    this.onConnectionLost(clientId);
  }

  public onConnectionLost(
    clientId: string,
    errorInfo?: { errorCode; errorMessage? },
    forceLog?: boolean,
  ) {
    if (errorInfo) {
      const { errorCode, errorMessage } = errorInfo;
      if (forceLog) {
        logSubscriptions.error(
          `Connection lost: ${clientId}, ${errorCode} ${errorMessage}`,
        );
      } else {
        logSubscriptions.info(
          `Connection lost: ${clientId}, ${errorCode} ${errorMessage}`,
        );
      }
      const clientObserver = this.clients.get(clientId);
      if (clientObserver) {
        const topics = clientObserver.topics;
        topics.forEach((t) => {
          if (this.topicObservers.has(t)) {
            this.topicObservers.get(t).observers.forEach((obs) =>
              obs.error({
                errorCode,
                errorMessage,
                [PERMANENT_ERROR_KEY]: true,
              }),
            );
          }
          this.topicObservers.delete(t);
        });
      }
    } else {
      logSubscriptions.info(`Connection lost: ${clientId} (no error)`);
    }
    this.clients.delete(clientId);
    this.finishDeferringReason(clientId);
    logSubscriptions.info(
      { clients: this.clients },
      'Connection lost - remaining clients',
    );
    logSubscriptions.info(
      { observers: this.topicObservers },
      'Connection lost - topics',
    );
  }

  private connectNewClient(clientId: string, selectionNames: string[]) {
    this.startDeferringReason(clientId);

    try {
      this.mqttProvider.createMqttClient({
        clientManager: this.clientManager,
        subHandler: this,
        clientId,
        mqttHost: this.mqttHost,
        selectionNames,
      });
    } catch (error) {
      reThrow({
        logger: logSubscriptions,
        message: 'Exception creating MQTT client (this is bad)',
        error,
      });
    }
  }

  public onConnected(mqttClient: IMqttClient, clientId: string) {
    if (!this.clients.has(clientId)) {
      this.clients.set(clientId, {
        client: mqttClient,
        topics: new Set(),
      });
    }
    this.finishDeferringReason(clientId);
  }

  private subscribeToTopics<T>(
    clientId,
    client,
    topics: string[],
    observer: ZenObservable.Observer<T>,
  ) {
    topics.map((topic) =>
      this.subscribeToTopic(clientId, client, topic, observer),
    );
  }

  public onSubscribeSuccess(clientId: string, topic: string, observer: any) {
    logger.debug(`subscribed to ${topic}: ONSUCCESS`);
    if (!this.topicObservers.has(topic)) {
      this.topicObservers.set(topic, { clientId, observers: new Set() });
    }
    this.topicObservers.get(topic).observers.add(observer);
    this.finishDeferringReason(topic);
  }

  public onSubscribeFailure(
    clientId: string,
    topic: string,
    errorInfo: { errorCode; errorMessage },
  ) {
    const { errorCode, errorMessage } = errorInfo;
    logger.error(
      { clientId, topic, errorCode, errorMessage },
      'ERROR failed to subscribe to topic ',
    );
  }

  private subscribeToTopic<T>(
    clientId: string,
    client: IMqttClient,
    topic: string,
    observer: ZenObservable.Observer<T>,
  ) {
    this.startDeferringReason(topic);
    logger.debug({ topic, clientId }, 'subscribed to topic start');
    client.subscribe(topic, observer);
  }

  public onMessage = (
    topic: string,
    message: string,
    selectionNames: string[],
  ) => {
    if (
      !this.checkAction(ActionTypes.MESSAGE, { topic, message, selectionNames })
    ) {
      return;
    }
    const parsedMessage = JSON.parse(message);
    const topicObservers = this.topicObservers.get(topic);
    if (!topicObservers) {
      logger.debug(
        { topic, parsedMessage },
        'No observers for topic - ignoring',
      );
      return;
    }

    logSubscriptions.info(
      { parsedMessage, topic, topicObservers, selectionNames },
      'Message received',
    );

    const data = selectionNames.reduce(
      (acc, name) => ((acc[name] = acc[name] || null), acc),
      parsedMessage.data || {},
    );

    topicObservers.observers.forEach((observer) => {
      try {
        observer.next({
          ...parsedMessage,
          ...{ data },
        });
      } catch (err) {
        reThrow({
          logger,
          error: err,
          message: `Error on message notification: ${topic}`,
          noThrow: true,
        });
      }
    });
  };
}
