import { useMemo, useState } from 'react';
import { BehaviorSubject, Observable, skip, type OperatorFunction } from 'rxjs';
import { useConstant } from '../hooks/useConstant';
import { useDynamicCallback } from '../hooks/useDynamicCallback';
import type { UseSubscriptionReturn } from '../hooks/useSubscription';
import type { SubscriptionResponse } from '../types/SubscriptionResponse';

export interface LimitReachedChangeEvent<T> {
  limitReached: boolean;
  lastRecord?: T;
  recordsReceived?: number;
}

export interface WSPaginationLimiterParams<T> {
  /**
   * A BehaviorSubject (must start with a populated value) emitting the current limit. New emissions on this stream
   * cause the pipe to execute nextPage if there now is more room in the limit.
   */
  limitBehaviorSubject: BehaviorSubject<number>;
  nextPage: UseSubscriptionReturn<T>['nextPage'];
  onLimitReachedChange?: ({ limitReached, lastRecord, recordsReceived }: LimitReachedChangeEvent<T>) => void;
}

/**
 * `wsPaginationLimiter` allows you do paging in RxJS with an applied limiter on the max records we want to page in.
 *
 * On every change of the limit being reached or not (reached <-> not reached), `onLimitReachedChange` will emit a new value
 * for you to store in state and convey to the user.
 */
export function wsPaginationLimiter<T>({
  limitBehaviorSubject,
  nextPage,
  onLimitReachedChange,
}: WSPaginationLimiterParams<T>): OperatorFunction<SubscriptionResponse<T>, SubscriptionResponse<T>> {
  return source => {
    return new Observable<SubscriptionResponse<T>>(output => {
      let recordsReceived = 0;
      let limitReached = false;
      let limit = limitBehaviorSubject.value;

      // Subscribe to limit updates. Skip 1 because we already set it above, taking it directly from the behavior subject.
      const limitSub = limitBehaviorSubject.pipe(skip(1)).subscribe(newLimit => {
        limit = newLimit;
        const newLimitReached = recordsReceived >= limit;

        // Only do anything if the limit being raised leads to us going from a state of the limit being reached to the new state of the limit not being reached.
        if (limitReached && !newLimitReached) {
          limitReached = false;
          onLimitReachedChange?.({ limitReached: false });
          // we dont need to overcomplicate things and check that the last received message.next is defined. this is handled by WebsocketClient.
          nextPage();
        }
      });

      const sourceSub = source.subscribe(message => {
        if (message.initial) {
          recordsReceived = 0;
        }

        // We only count received records to the total if they are a page
        const isPaginationMessage = message.initial || message.page;
        if (isPaginationMessage) {
          recordsReceived += message.data.length;
        }

        const newLimitReached = recordsReceived >= limit;

        // Always forward the message
        output.next(message);
        if (newLimitReached !== limitReached) {
          // A change happened, notify
          const lastRecord = message.data.at(-1);
          onLimitReachedChange?.({
            limitReached: newLimitReached,
            lastRecord: newLimitReached ? lastRecord : undefined, // only pass along the last record if we've reached the limit
            recordsReceived: newLimitReached ? recordsReceived : undefined, // only pass the amount of records received if were at the limit
          });
          limitReached = newLimitReached;
        }

        if (!newLimitReached && message.next != null) {
          nextPage();
        }
      });

      return () => {
        if (limitReached) {
          // If limitReached has last been emitted as true from this pipe, we then let the implementer know
          // that this is no longer the case after we are unmounted. An "unmount" of this observable would in the blotter case mean that
          // the observable is killed / re-made.
          onLimitReachedChange?.({ limitReached: false });
        }
        limitSub.unsubscribe();
        sourceSub.unsubscribe();
      };
    });
  };
}

type UseWSPaginationLimiterParams<T> = {
  startingLimit: number | undefined;
} & Pick<WSPaginationLimiterParams<T>, 'nextPage' | 'onLimitReachedChange'>;

/**
 * This hooks creates a wsPaginationLimiter pipe instance for the caller and exposes an API to create a default limit and a function to call for
 * raising the limit. When the raiseLimit function is called, the wsPaginationLimiter pipe instance will call nextPage.
 *
 * If you want to conditionally apply the pagination limiter you can achieve this by setting the startingLimit to Infinity.
 */
export const useWsPaginationLimiter = <T>({
  startingLimit,
  nextPage,
  onLimitReachedChange,
}: UseWSPaginationLimiterParams<T>) => {
  // if startingLimit is undefined, just pass 0. The user has not passed a limit
  const [limit, setLimit] = useState(startingLimit ?? Infinity);
  const limitBehaviorSubject = useConstant(new BehaviorSubject<number>(startingLimit ?? Infinity));

  const raiseLimit: (raiseBy?: number) => void = useDynamicCallback((raiseBy = 10_000) => {
    limitBehaviorSubject.next(limit + raiseBy);
    setLimit(limit + raiseBy);
  });

  const wsPaginationLimiterInstance = useMemo(
    () => wsPaginationLimiter({ limitBehaviorSubject, nextPage, onLimitReachedChange }),
    [limitBehaviorSubject, nextPage, onLimitReachedChange]
  );

  return { raiseLimit, limit, wsPaginationLimiterInstance };
};
