import { HttpClient } from '@angular/common/http';
import { Inject, Injectable } from '@angular/core';
import Centrifuge from 'centrifuge';
import { Observable } from 'rxjs';
import SockJS from 'sockjs-client';

import { IENVIRONMENT_SERVICE, IEnvironmentService } from '../environment/environment.interface';
import { ISettings, ISocketsService } from './sockets-service.interface';
import { publishReplay, refCount } from 'rxjs/operators';
import { switchMap } from 'rxjs/operators';

@Injectable()
export class CentrifugeService implements ISocketsService {
  private url: string;
  private centrifugeClient$: Observable<{ centrifugeClient: Centrifuge; settings: ISettings }>;

  constructor(@Inject(IENVIRONMENT_SERVICE) private envService: IEnvironmentService, private http: HttpClient) {
    this.url = this.envService.getVar('services.api') as string;
    this.init();
  }

  subscribeObservable<T>(channel: string): Observable<T> {
    return this.centrifugeClient$.pipe(
      switchMap(({ centrifugeClient, settings }) => {
        return new Observable<T>((observer) => {
          const channelKey = `${channel}_${settings.prefix}`;
          const subscription = centrifugeClient.subscribe(channelKey, (message) => {
            observer.next(message.data);
          });

          return () => {
            subscription.unsubscribe();
            subscription.removeAllListeners();
          };
        });
      })
    );
  }

  private init(): void {
    this.centrifugeClient$ = this.http.get<ISettings>(this.url + '/centrifugo/settings').pipe(
      switchMap((settings) => {
        const centrifugeClient = new Centrifuge(settings.url + '/connection/sockjs', {
          subscribeEndpoint: settings.subscribeEndpoint,
          refreshEndpoint: settings.refreshEndpoint,
          sockjs: SockJS,
        });
        centrifugeClient.setToken(settings.jwt);
        const connect$ = new Observable<{ centrifugeClient: Centrifuge; settings: ISettings }>((observer) => {
          centrifugeClient.on('connect', () => {
            observer.next({ centrifugeClient, settings });
          });
          centrifugeClient.on('disconnect', () => {
            centrifugeClient.connect();
          });
        });
        centrifugeClient.connect();

        return connect$;
      }),
      publishReplay(1),
      refCount()
    );
  }
}
