import { environment } from '@environment';
import { BehaviorSubject, defer, iif, Observable, Subject, throwError, timer } from 'rxjs';
import { catchError, concatMap, distinctUntilChanged, retryWhen, tap } from 'rxjs/operators';
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { Logger } from '@mona/shared/logger';
import { AppError, BaseErrors, isEmpty, isEmptyObject } from '@mona/shared/utils';

/**
 * Ws service
 */
export class WsService<T, R = unknown> {
    /** Socket url */
    originalUrl: string;
    /**
     * Connection status observer
     */
    private _connection$: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
    /**
     * Connection status observer
     */
    get connection$(): Observable<boolean> {
        return this._connection$.pipe(distinctUntilChanged());
    }

    /**
     * Connection status observer - sync
     */
    get connection(): boolean {
        return this._connection$.getValue();
    }
    set connection(value: boolean) {
        this._connection$.next(value);
    }
    /**
     * Subject which emits ws errors
     */
    private _wsError$: Subject<AppError> = new Subject();
    /**
     * Public subject which emits ws errors
     */
    get wsError$(): Observable<AppError> {
        return this._wsError$.asObservable();
    }
    /**
     * Attempts done after failing
     */
    readonly reconnectConfig = {
        attempts: environment.reconnectAttempts,
        interval: environment.reconnectInterval,
        count: 0,
    };
    /**
     * private WebSocketSubject config
     */
    private _config: WebSocketSubjectConfig<T>;
    /**
     * WebSocketSubject config
     */
    get config(): WebSocketSubjectConfig<T> {
        return this._config;
    }
    /**
     * WebSocketSubject config
     */
    private defaultWsConfig: Partial<WebSocketSubjectConfig<T>> = {
        WebSocketCtor: WebSocket,
        closeObserver: {
            next: event => {
                // to differentiate whether connection is lost or closed by unsubscribing
                if (!event.wasClean && this.reconnectConfig.attempts > this.reconnectConfig.count) {
                    return event.preventDefault();
                }
                this.websocket$?.unsubscribe();
                this.websocket$ = null;
                this.connection = false;
                this.logger.log(`${this.constructor.name} disconnected from`, this.originalUrl);
            },
        },
        openObserver: {
            next: () => {
                this.connection = true;
                this.logger.log(`${this.constructor.name} connected to`, this.originalUrl);
            },
        },
    };

    /**
     * Web socket subject holder
     */
    protected websocket$: WebSocketSubject<T>;

    /**
     * Ws messages
     */
    readonly wsMessages$ = new Subject<T>();

    protected readonly logger = new Logger();

    /**
     * Set websocket service config
     *
     * @param url
     * @param accessToken
     * @param wsConfig
     * @param reconnectInterval
     * @param reconnectAttempts
     */
    setConfig(
        url: string,
        accessToken = '',
        wsConfig: Partial<WebSocketSubjectConfig<T>> = {},
        reconnectInterval?: number,
        reconnectAttempts?: number,
    ) {
        try {
            this.originalUrl = new URL(url).href.slice(0, -1);
        } catch (error) {
            throw new Error('Invalid URL for WebSocket');
        }
        if (accessToken) {
            url = url.concat(`?access_token=${accessToken}`);
        }
        this._config = Object.assign(this.defaultWsConfig, {
            url,
            ...wsConfig,
        });
        Object.assign(this.reconnectConfig, {
            interval: reconnectInterval,
            attempts: reconnectAttempts,
        });
    }

    /**
     * Disconnect
     */
    disconnect(): void {
        this.connection = false;
        // this._wsError$?.complete();
        this.websocket$?.unsubscribe();
        this.websocket$?.complete();
        this.websocket$ = null;
    }

    /**
     * Connect to WebSocked
     *
     * @param args
     */
    connect(...args: any): Observable<T | R> {
        if (isEmptyObject(this.config)) {
            throw new Error(`${this.constructor.name} can not connect without proper config`);
        }
        this.logger.log(`${this.constructor.name} try connect to`, this.originalUrl);
        // Do not open new connection until old is closed
        if (this.websocket$) {
            return this.websocket$.asObservable() as Observable<any>;
        }

        this.websocket$ = webSocket(this.config);
        return this.websocket$.pipe(
            this.reconnect(),
            catchError(wsError => {
                this._wsError$.next(wsError);
                this.logger.error(`${this.constructor.name} error:`, wsError.originalError.message);
                this.disconnect();

                if (wsError instanceof AppError) {
                    return throwError(wsError);
                }

                return throwError({
                    errorCode: BaseErrors.WEBSOCKET,
                    originalError: {
                        message: 'Websocket error has happened. Check terminal logs for more details.',
                        wsError,
                    },
                });
            }),
            tap(message => {
                this.wsMessages$.next(message);
            }),
        );
    }

    /**
     * Send message trough websocket, serialize by default subject serializer
     *
     * @param message
     */
    send(message: T) {
        if (this.websocket$ && !isEmpty(message)) {
            this.websocket$ && this.websocket$.next(message);
        }
    }

    /**
     * Retry a given observable by a time span
     */
    private reconnect() {
        const {
            reconnectConfig: _reconnect,
            constructor: { name },
            logger: { log },
        } = this;
        return (source: Observable<any>) => {
            if (!_reconnect.attempts) {
                return source;
            }
            return defer(() => {
                return source.pipe(
                    retryWhen<T>(errors =>
                        errors.pipe(
                            concatMap((error: CloseEvent, index) => {
                                _reconnect.count = index + 1;
                                log(
                                    `${name} reconnect attempt #${_reconnect.count} of ${_reconnect.attempts} in ${_reconnect.interval}ms,`,
                                );
                                return iif(
                                    () => _reconnect.count < _reconnect.attempts,
                                    timer(_reconnect.interval),
                                    throwError(
                                        new AppError({
                                            message: 'Maximum reconnect attempts',
                                            code: BaseErrors.WEBSOCKET_MAX_RETRIES,
                                            status: error.code,
                                        }),
                                    ),
                                );
                            }),
                        ),
                    ),
                );
            });
        };
    }
}
