import { fromEvent, merge, switchMap, map, shareReplay, tap, retry, firstValueFrom, of, catchError } from 'rxjs';
import type { Observable } from 'rxjs';
import io from 'socket.io-client';
import { createJawsToken } from 'services/api/proxy-jaws/inner';
import toArray from 'utils/to-array';
import settings from 'configurations/application';
// eslint-disable-next-line no-restricted-imports
import user from 'store/user';

import ServiceBase from '../service-base';

import { parseToWSNamespace, removeWSNamespace } from './utils';
import type { WebSocketResponse, LiveNotificationEvent } from './contracts';

class Websocket extends ServiceBase {
  name = 'websocket';

  private performerId$: Observable<number> = user.onModelChange$.pipe(map(({ viewTypeId }) => viewTypeId));

  private onPerformerAuth$ = this.performerId$.pipe(
    switchMap((performerId) =>
      createJawsToken({
        headers: {
          'X-Actor-Type': 'performer',
          'X-Actor-Id': performerId,
        },
      })
    ),
    map((response) => response.data.data),
    catchError(() => of({ interrupted: true })),
    retry({ count: 3, delay: 500, resetOnSuccess: true }),
    shareReplay({ bufferSize: 1, refCount: true })
  );

  private websocket = io(settings.envVars.websocketEndpoint, {
    multiplex: true,
    auth: async (callback: (credentials: { token: string } | { interrupted: boolean }) => void) => {
      const credentials = await firstValueFrom(this.onPerformerAuth$);

      callback(credentials);
    },
    query: {
      ver: 'nautilus',
    },
    transports: ['websocket', 'polling'],
    forceNew: true,
    reconnection: true,
    reconnectionAttempts: 10,
    autoConnect: false,
  });

  private onConnect$ = fromEvent(this.websocket, 'connect').pipe(shareReplay({ bufferSize: 1, refCount: true }));

  private onReconnect$ = fromEvent(this.websocket, 'reconnect');

  private onConnectError$ = fromEvent(this.websocket, 'connect_error');

  private onAuthTimeout$ = fromEvent(this.websocket, 'auth.timeout');

  private watchList = new Set<string>();

  constructor() {
    super();

    fromEvent(window, 'online').subscribe(() => this.websocket.connect());

    fromEvent(window, 'offline').subscribe(() => this.websocket.disconnect());

    this.onPerformerAuth$.subscribe((token) => {
      if (!this.websocket.connected) {
        this.websocket.connect();

        return;
      }

      this.websocket.emit('auth', token);
    });

    this.onReconnect$.pipe(switchMap(() => this.onPerformerAuth$)).subscribe(() => {
      this.websocket.emit('subscribe', Array.from(this.watchList));
    });

    this.onConnectError$.subscribe((error) => {
      this.log.error((error as Record<string, unknown>)?.message, 'onConnectError$');
    });

    this.onAuthTimeout$.subscribe(() => {
      this.websocket.disconnect().connect();
    });
  }

  private subscribeToEvents(events: Array<string>): void {
    const eventsToSubscribe = events.filter((event) => !this.watchList.has(event));

    if (!eventsToSubscribe.length) return;

    eventsToSubscribe.forEach((event) => {
      this.watchList.add(event);
    });

    this.websocket.emit('subscribe', eventsToSubscribe);
  }

  public on$<T>(
    events: LiveNotificationEvent | Array<LiveNotificationEvent>
  ): Observable<{ event: LiveNotificationEvent; content: WebSocketResponse<unknown, T>[1] }> {
    const normalizedEvents = toArray(events).map((event) => parseToWSNamespace(event));

    return this.onConnect$.pipe(
      tap(() => this.subscribeToEvents(normalizedEvents)),
      switchMap(() =>
        merge(
          ...normalizedEvents.map((event) =>
            fromEvent<WebSocketResponse<LiveNotificationEvent, T>>(this.websocket, event)
          )
        )
      ),
      map(([header, body]) => ({
        event: removeWSNamespace(header.event),
        content: body,
      }))
    );
  }
}

export default new Websocket();
