import { Injectable, inject } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Observable, Subject, merge } from 'rxjs';
import { concatMap, mergeMap, switchMap, tap } from 'rxjs/operators';
import {
  connect,
  disconnect,
  setConnectionState,
  setDomainMessageReceived,
  signalRHubConnected,
  updateConnectionState,
} from './signalr.actions';

import { LoggerService } from '@app/services/logger.service';
import { HubConnectionState } from '@microsoft/signalr';
import { Action } from '@ngrx/store';
import { IDomainMessage, isDomainMessage } from '@ra-state';
import { HubConnectionProxy, SignalrService } from '@rockwell-automation-inc/service';
@Injectable({
  providedIn: 'root',
})
export class SignalREffects {
  hubProxy!: HubConnectionProxy;
  domainMsgSubscriber$ = new Subject<IDomainMessage>();
  connectionStateSubscriber$ = new Subject<HubConnectionState>();
  onDomainMessage$ = this.domainMsgSubscriber$.asObservable();
  onConnectionStateUpdate$ = this.connectionStateSubscriber$.asObservable();
  eventListenersHookedup = false;

  setStartConnection$ = createEffect((): Observable<Action> => {
    return this.actions$.pipe(
      ofType(connect),
      tap(() => {
        this.logger.log('starting connection');
        this.signalRService.startConnection();
      }),
      concatMap(() => this.signalRService.onHubConnected$),
      switchMap((proxy) => {
        this.logger.log('set hub proxy');
        this.hubProxy = proxy;
        const actions: Action[] = [];
        actions.push(setConnectionState({ payload: proxy.HubConnection.state }));
        if (proxy.HubConnection.state === HubConnectionState.Connected) {
          actions.push(signalRHubConnected());
        }
        return actions;
      }),
    );
  });

  isDomainMessage = (val: IDomainMessage | any): val is IDomainMessage => {
    return (val as unknown as IDomainMessage).context !== undefined;
  };

  setHubConnected$ = createEffect((): Observable<any> => {
    return this.actions$.pipe(
      ofType(setConnectionState),
      tap(() => {
        if (!this.eventListenersHookedup) {
          this.hubProxy.on('OnDomainMessageReceived', (domainMsg) => this.domainMsgSubscriber$.next(domainMsg));
          this.hubProxy.onreconnected(() => this.connectionStateSubscriber$.next(this.hubProxy.HubConnection.state));
          this.hubProxy.onreconnecting(() => this.connectionStateSubscriber$.next(this.hubProxy.HubConnection.state));
          this.hubProxy.onclose((err?: Error) => {
            // we should handle disconncts
            //https://learn.microsoft.com/en-us/aspnet/core/signalr/javascript-client?view=aspnetcore-6.0&tabs=visual-studio#automatically-reconnect
            if (err) {
              this.logger.error(err);
            }
            this.connectionStateSubscriber$.next(this.hubProxy.HubConnection.state);
          });
          this.logger.log('Hooking up signalr event listeners to observables');
          this.eventListenersHookedup = true;
        }
      }),
      switchMap(() => merge(this.onDomainMessage$, this.onConnectionStateUpdate$)),
      mergeMap((msg) => {
        if (isDomainMessage(msg)) {
          return [setDomainMessageReceived({ payload: msg })];
        } else {
          //prevent loop by raising an update event
          this.logger.log('Publishing connection state update event', msg);
          return [updateConnectionState({ payload: msg })];
        }
      }),
    );
  });

  disconnect$ = createEffect(() => {
    return this.actions$.pipe(
      ofType(disconnect),
      tap(() => {
        this.logger.log('closing connection');
        this.signalRService.stopConnection();
      }),
    );
  });

  logger = inject(LoggerService).withContext(SignalREffects);
  constructor(
    private actions$: Actions,
    private signalRService: SignalrService,
  ) {}
}
