import {combineLatest, Subject, fromEvent, from, merge} from 'rxjs';
import {
  filter,
  startWith,
  withLatestFrom,
  map,
  tap,
  distinctUntilChanged,
} from 'rxjs/operators';

import ROUTE_QUERY from '@core/application/graphql/queries/route.gql';
import isInRouteList from '@core/utils/routing/isInRouteList';
import POPUP_STATE_CHANGE_EVENT from '@core/popup/constants/popupStateChangeEvent';
import {INSTANT_EVENTS} from '@core/extraEvent/constants/extraEventTypes';

import EXTRA_EVENT_SUBSCRIPTION from '../graphql/subscriptions/extraEvent.gql';
import DISABLED_ROUTES from '../constants/disabledRoutes';
import {cacheData, clearCachedData, getCacheData} from './storageActions';
import performExtraAction from './performExtraAction';

/**
 * Needed to not show pop-up if exceeded time-limit after receiving interaction
 * Export just for tests
 * @type {Number}
 */
export const TIME_TO_FORGET_POPUP = 4 * 60000;

/**
 * Watcher that fires on every Popup state change (open/close), Route change,
 * and appropriate interaction (@see src/packages/dating/websocket/resolvers/extraEvents.js).
 * We can process extra events only at allowed routes and if any other popup isn't opened.
 * Otherwise we store event to localStorage and show it when all conditions are met.
 * If we have stored event and there is another incoming event from WS, we override stored event with latest one.
 * No queue consciously.
 * The only exception is expiration events. We need to process them instantly.
 * @param {ApolloClient} client
 * @param {string} currentUserId
 */
const startExtraEventListener = (client, currentUserId) => {
  /**
   * Observable that emits on every Popup state change (open/close).
   * @type {Observable<{ detail: { hasQueue: boolean } }>}
   */
  const popupStateObservable = fromEvent(
    document,
    POPUP_STATE_CHANGE_EVENT,
  ).pipe(
    startWith({
      detail: {
        hasQueue: false,
      },
    }),
  );

  /**
   * In order to be able to call existing actions from extra event actions without using sockets
   * @type {Subject<unknown>}
   * @see GamePopup::openDiscountPopupOnChangeRoute
   */
  const extraEventsSubject = new Subject();

  /**
   * Observable that emits on each Popup state change (open/close) and Route change.
   * Used to retrieve event from cache and pass it further if it's possible.
   * @type {Observable<{ data: { type: string, processId: string } }>}
   */
  const processingConditionsObservable = combineLatest([
    client.watchQuery({query: ROUTE_QUERY}),
    popupStateObservable,
  ]).pipe(
    map((cond) => [cond, getCacheData(currentUserId)]),
    filter(([, extraEvent]) => Boolean(extraEvent)),
    filter(
      ([[{data: routeData}, {detail: popupData}]]) =>
        !isInRouteList(DISABLED_ROUTES, routeData.route.current) &&
        !popupData.hasQueue,
    ),
    tap(() => clearCachedData(currentUserId)),
    filter(
      ([
        ,
        {
          data: {
            extraEvents: {isComboPedalSecondPart},
            arrivalTime,
          },
        },
      ]) => {
        // It's necessary show second part of the combo pedal after expired arrivalTime
        if (isComboPedalSecondPart || !arrivalTime) return true;

        return Date.now() - arrivalTime < TIME_TO_FORGET_POPUP;
      },
    ),
    map(([, extraEvent]) => extraEvent.data.extraEvents),
  );

  /**
   * Observable that emits on each incoming interaction.
   * Pass event further if it's possible, otherwise cache it.
   * @type {Observable<{ data: { type: string, processId: string } }>}
   */
  const interactionObservable = merge(
    from(client.subscribe({query: EXTRA_EVENT_SUBSCRIPTION})),
    extraEventsSubject,
  ).pipe(
    distinctUntilChanged(
      ({data: {extraEvents: prevExtraEvents}}, {data: {extraEvents}}) => {
        /**
         * Event-based systems can't guarantee that event will be delivered only one time.
         * Also such systems can't guarantee that event will be delivered for consumer.
         * So for precaution we filter duplicated events.
         */
        return (
          prevExtraEvents.processId === extraEvents.processId &&
          prevExtraEvents.type === extraEvents.type
        );
      },
    ),
    withLatestFrom(
      client.watchQuery({query: ROUTE_QUERY}),
      popupStateObservable,
    ),
    filter(
      ([{data: extraEventData}, {data: routeData}, {detail: popupData}]) => {
        // Process instant events with no restrictions
        if (
          INSTANT_EVENTS.some(
            (type) => type === extraEventData.extraEvents.type,
          )
        ) {
          return true;
        }

        const canShow =
          !isInRouteList(DISABLED_ROUTES, routeData.route.current) &&
          !popupData.hasQueue;

        if (!canShow) {
          cacheData(currentUserId, {
            data: {...extraEventData, arrivalTime: Date.now()},
          });
        }

        return canShow;
      },
    ),
    map(([extraEvent]) => extraEvent.data.extraEvents),
  );

  /**
   * Merges passed event data from source observables and processes it.
   * @type {Observable<{ data: { type: string, processId: string } }>}
   */
  merge(interactionObservable, processingConditionsObservable).subscribe(
    (extraEvent) => {
      performExtraAction(extraEvent, client, extraEventsSubject);
    },
  );
};
export default startExtraEventListener;
