import { Injectable, isDevMode, PLATFORM_ID, inject } from '@angular/core';
import { merge, share, Subject, take, timer } from 'rxjs';
import { Message, MessageType, Topic } from '../../model/model';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { isPlatformBrowser } from '@angular/common';
import { distinctUntilChanged, filter, map, retry, shareReplay, switchMap } from 'rxjs/operators';
import { environment } from '../../environments/environment';
import { v4 as uuidv4 } from 'uuid';

@Injectable({
  providedIn: 'root',
})
export class SocketService {
  private readonly platformId = inject(PLATFORM_ID);

  private socket$?: WebSocketSubject<Message<unknown>>;
  public onlineStatus$ = new Subject<boolean>();

  constructor() {
    if (isPlatformBrowser(this.platformId)) {
      const openSubject$ = new Subject<Event>();
      const closeSubject$ = new Subject<CloseEvent>();
      merge(openSubject$.pipe(map(() => true)), closeSubject$.pipe(map(() => false)))
        .pipe(distinctUntilChanged(), shareReplay(1))
        .subscribe(this.onlineStatus$);
      this.onlineStatus$.subscribe(value => isDevMode() && console.log('Websocket Connection established:', value));
      this.socket$ = webSocket<Message<unknown>>({
        url: `${environment.endpoints.webSocket}`,
        openObserver: openSubject$,
        closeObserver: closeSubject$,
      });
    }
  }

  public getUpdateStream(topic: Topic) {
    let subscribeMessageId: string | undefined;
    const socket$ = this.socket$!.multiplex(
      () => {
        subscribeMessageId = uuidv4();
        return { id: subscribeMessageId, messageType: 'subscribe', topic, value: topic };
      },
      () => ({ id: uuidv4(), messageType: 'unsubscribe', topic, value: topic }),
      value => value.reply === subscribeMessageId!
    ).pipe(share());
    return socket$.pipe(
      // wait for the confirm event
      filter(value => value.messageType === MessageType.Confirm),
      take(1),
      // switch to the stream
      switchMap(() => {
        if (isDevMode()) {
          console.log('Websocket Subscription Confirmed', topic);
        }
        return socket$;
      }),

      // in case of a connection abort retry
      retry({
        resetOnSuccess: true,
        delay: this.backoff,
      }),
      share()
    );
  }

  sendMessage(msg: Message<unknown>) {
    this.socket$?.next(msg);
  }

  private backoff(err: unknown, retryCount: number) {
    const interval = 500;
    const delay = Math.min(Math.pow(2, retryCount - 1) * interval, 60 * 1000);
    if (isDevMode()) {
      console.warn(`Websocket connection retry nr ${retryCount} in ${delay}ms due to error.`, err);
    }
    return timer(delay);
  }
}
