import { Inject, Injectable, InjectionToken, Optional } from '@angular/core';
import { HubConnection, HubConnectionBuilder, HubConnectionState } from '@microsoft/signalr';
import { WebSocketClientConfiguration } from './web-socket-client-configuration';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { filter, finalize, map, share } from 'rxjs/operators';

export interface AccessTokenFactory {
  getAccessToken(): string | Promise<string>;
}

export const webSocketAccessTokenFactory = new InjectionToken<AccessTokenFactory>('webSocketAccessTokenFactory');

@Injectable()
export class WebSocketClientService {
  private connectionBuilder = new HubConnectionBuilder();
  private hubConnection: HubConnection;
  private subscriptions: string[] = [];

  private connectionSubject = new BehaviorSubject<boolean>(false);
  private isDisconnected$ = this.connectionSubject.asObservable();
  private notificationSubject = new Subject<[string, any]>();
  private notifications$ = this.notificationSubject.asObservable().pipe(share());

  constructor(
    private webSocketClientConfiguration: WebSocketClientConfiguration,
    @Optional() @Inject(webSocketAccessTokenFactory) private accessTokenFactory?: AccessTokenFactory
  ) {
    this.initializeHubConnection();
  }

  private initializeHubConnection(): void {
    this.hubConnection = this.connectionBuilder
      .withUrl(this.webSocketClientConfiguration.hubUrl, { accessTokenFactory: () => this.accessTokenFactory?.getAccessToken() })
      .withAutomaticReconnect({
        nextRetryDelayInMilliseconds: () => Math.random() * 10000
      })
      .build();
  }

  public startConnection(): Promise<void> {
    if (this.hubConnection.state !== HubConnectionState.Disconnected) {
      return Promise.resolve();
    }

    return this.hubConnection
      .start()
      .then(() => {
        console.info('Connection started');
        this.registerEvents();
        this.registerListeners();
        if (this.subscriptions) {
          this.resubscribe();
        }
      })
      .catch((err) => {
        console.error('Error while starting connection: ', err);
      });
  }

  public getNotificationsForTopic<T>(topic: string): Observable<T> {
    this.subscribeTo(topic);

    return this.notifications$.pipe(
      filter(([currentTopic]: [string, T]) => currentTopic === topic),
      map(([, data]: [string, T]) => data),
      finalize(() => {
        this.unsubscribeFrom(topic);
      })
    ) as Observable<T>;
  }

  public isDisconnected(): Observable<boolean> {
    return this.isDisconnected$;
  }

  private subscribeTo(endpoint: string): void {
    try {
      this.invokeMethod('subscribe', endpoint);
    } finally {
      this.subscriptions.push(endpoint);
    }
  }

  private unsubscribeFrom(endpoint: string): void {
    const index = this.subscriptions.indexOf(endpoint);
    this.subscriptions.splice(index, 1);
    this.invokeMethod('unsubscribe', endpoint);
  }

  private invokeMethod(method: string, endpoint: string): void {
    if (this.subscriptions.includes(endpoint)) {
      console.info('SignalR: ', method, ' ignore ', endpoint);
    } else {
      console.info('SignalR: ', method, ' for ', endpoint);
      this.hubConnection.invoke(method, endpoint).then();
    }
  }

  private resubscribe(): void {
    const unSubs = [...this.subscriptions];
    const subs = [...this.subscriptions];
    unSubs.forEach((endpoint) => {
      this.unsubscribeFrom(endpoint);
    });
    subs.forEach((endpoint) => {
      this.subscribeTo(endpoint);
    });
  }

  private registerEvents(): void {
    this.hubConnection.onclose(() => {
      console.info('Connected closed, no further updates');
      this.connectionSubject.next(true);
    });
    this.hubConnection.onreconnecting(() => {
      console.info('Connection is reconnecting...');
      this.connectionSubject.next(true);
    });
    this.hubConnection.onreconnected(() => {
      console.info('Connection resubscribing');
      this.resubscribe();
      console.info('Connection reconnected');
      this.connectionSubject.next(false);
    });
  }

  private registerListeners(): void {
    this.hubConnection.on('Publish', (topic, data) => {
      this.notificationSubject.next([topic, JSON.parse(data)]);
    });
  }
}
