import { Observable, Observer, PartialObserver, ReplaySubject, Subject, Subscriber, Subscription, isObservable, of } from 'rxjs';
import { first } from 'rxjs/operators';

export type WebSocketRawMessage = string | ArrayBuffer | Blob | ArrayBufferView;

export type WebSocketMessage = {
  action: 'keepalive' | 'sync';
  data?: WebSocketSyncData<unknown>;
};

export type WebSocketSyncData<T> = {
  reason: 'create' | 'update' | 'delete';
  source: string;
  payload: T;
};

export class WebSocketSubject extends Subject<WebSocketRawMessage> {
  private _input: Observer<WebSocketRawMessage>;
  private _output: Subject<WebSocketRawMessage>;
  private _socket?: WebSocket;

  constructor(
    private _url: string,
    private _protocols?: string | string[] | Observable<string | string[]>,
    private _binaryType?: 'blob' | 'arraybuffer',
  ) {
    super();
    this._input = new ReplaySubject<WebSocketRawMessage>();
    this._output = new Subject<WebSocketRawMessage>();
  }

  override next(value: WebSocketRawMessage): void {
    this._input.next(value);
  }

  override error(_: unknown): never {
    throw new Error('Method not implemented.');
  }

  override complete(): never {
    throw new Error('Method not implemented.');
  }

  override subscribe(
    observerOrNext?: PartialObserver<WebSocketRawMessage> | ((value: WebSocketRawMessage) => void) | null,
    error?: ((error: unknown) => void) | null,
    complete?: (() => void) | null,
  ): Subscription {
    const protocols$ = isObservable(this._protocols) ? this._protocols : of(this._protocols || undefined);
    protocols$.pipe(first()).subscribe(protocols => this.connect(this._url, protocols));

    const subscriber = new Subscriber<WebSocketRawMessage>(observerOrNext!, error!, complete!);
    subscriber.add(() => {
      if (!this._output.observers.length) {
        if (this._socket && this._socket.readyState === WebSocket.OPEN) {
          this._socket.close();
        }
        this.resetState();
      }
    });
    this._output.subscribe(subscriber);

    return super.subscribe(subscriber);
  }

  override unsubscribe(): void {
    if (this._socket && this._socket.readyState === WebSocket.OPEN) {
      this._socket.close();
    }
    this.resetState();
    super.unsubscribe();
  }

  private connect(url: string, protocols?: string | string[]): void {
    if (this._socket && this._socket.readyState === WebSocket.OPEN) {
      return;
    }
    const observer = this._output;
    const socket = new WebSocket(url, protocols);
    if (this._binaryType) {
      socket.binaryType = this._binaryType;
    }

    socket.onopen = () => {
      if (!this._socket) {
        socket.close();
        this.resetState();
        return;
      }
      observer.next();
      const queue = this._input;
      this._input = Subscriber.create<WebSocketRawMessage>(value => {
        if (socket.readyState === WebSocket.OPEN && value) {
          socket.send(value);
        }
      });
      if (queue instanceof ReplaySubject) {
        queue.subscribe(this._input);
      }
    };

    socket.onclose = (ev: CloseEvent) => {
      this.resetState();
      if (ev.wasClean) {
        observer.complete();
      } else {
        observer.error(ev);
      }
    };

    socket.onmessage = (ev: MessageEvent) => {
      observer.next(ev.data);
    };

    this._socket = socket;
  }

  private resetState(): void {
    this._input = new ReplaySubject<WebSocketRawMessage>();
    this._output = new Subject<WebSocketRawMessage>();
    this._socket = undefined;
  }
}
