import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { BehaviorSubject, filter, map, Observable, switchMap, take } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { environment } from '../../../environments/environment';
import { jwtHelper } from '../../auth/state/auth.utils';

interface GetWSTokenResponse {
  token: string;
}

export interface WebSocketMessage<T> {
  action: string;
  data?: T;
  errorMessage?: string;
}

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {
  private isInitInProgress: boolean;
  private connectionToken: string;
  private webSocketSubject = new BehaviorSubject<WebSocketSubject<WebSocketMessage<unknown>>>(null);

  constructor(private http: HttpClient) {}

  private get websocket$() {
    // Force to create new connection if connection token expired
    if (!this.connectionToken || jwtHelper.isTokenExpired(this.connectionToken)) {
      this.connectionToken = null;
      this.webSocketSubject.next(null);
    }

    if (!this.webSocketSubject.value) {
      if (this.isInitInProgress) {
        return this.webSocketSubject.pipe(
          filter(subject => subject !== null),
          take(1)
        );
      }

      return this.init();
    }

    return this.webSocketSubject;
  }

  private init() {
    this.isInitInProgress = true;

    return this.http.post<GetWSTokenResponse>(`${environment.lambdaGatewayUrl}/getwstoken`, null).pipe(
      switchMap(tokenResponse => {
        this.connectionToken = tokenResponse.token;
        this.isInitInProgress = false;

        this.webSocketSubject.next(
          webSocket<WebSocketMessage<unknown>>({
            url: `${environment.webSocketUrl}?token=${this.connectionToken}`,
            closeObserver: {
              next: (event: CloseEvent) => {
                // Remove connection from subject if it was closed unexpectedly
                if (!event.wasClean) {
                  this.connectionToken = null;
                  this.webSocketSubject.next(null);
                  console.error('Connection closed unexpectedly', event);
                }
              }
            }
          })
        );

        return this.webSocketSubject;
      })
    );
  }

  observe<T>(actions: string): Observable<WebSocketMessage<T>>;
  observe<T>(...actions: string[]): Observable<WebSocketMessage<T>>;
  observe<T>(actions: [string, ...string[]]): Observable<WebSocketMessage<T>>;
  observe<T>(actions: [string, ...string[]] | string): Observable<WebSocketMessage<T>> {
    const actionsList = typeof actions === 'string' ? [actions] : actions;
    return this.websocket$.pipe(
      filter(subject => subject !== null),
      switchMap((websocketSubject: WebSocketSubject<WebSocketMessage<T>>) => {
        return websocketSubject.pipe(filter(msg => actionsList.includes(msg.action)));
      })
    );
  }

  sendMessage<T>(msg: WebSocketMessage<T>) {
    return this.websocket$.pipe(
      filter(subject => subject !== null),
      map((websocketSubject: WebSocketSubject<WebSocketMessage<T>>) => websocketSubject.next(msg))
    );
  }

  unsubscribe() {
    this.webSocketSubject?.unsubscribe();
    this.webSocketSubject = null;
  }
}
