import { filter, pipe, scan, shareReplay, type Observable, type UnaryFunction } from 'rxjs';
import type { MinimalSubscriptionResponse } from '../types/SubscriptionResponse';
import { UpdateActionEnum } from '../types/types';

type DiffMap<T> = Map<string, { prev: T | undefined; next: T | undefined }>;

export interface WSSubscriptionMemoryOutput<T> {
  /** The current state of the world at any point in time -- the result of scanning all received data updates into a map since the creation of the observable */
  memory: Map<string, T>;
  /** The latest json message received in its entirety */
  latestMessage: MinimalSubscriptionResponse<T>;
  /** The diff between the latest json message and the one before it. If the latest message is an initial: true message, it will be undefined. */
  diff: DiffMap<T> | undefined;
}

/**
 * An RxJS pipe to attach to any SubscriptionResponse-emitting observable in order to maintain a memory, the latestMessage and a diff.
 * The pipe will not emit anything until some first json message has arrived.
 * The pipe will be shared amongst subscribers, and on new subscriber will return the latest emitted set of its output: `{memory, latestMessage, diff}`.
 * Also, this memory pipe keeps the observable hot at this point -- meaning that if all subscribers leave, the observable will stay around maintaining its memory,
 * waiting for a new subscriber to come along.
 * @returns output of type {@link WSSubscriptionMemoryOutput<T>}.
 * `memory` is the current state of the world -- the result of scanning all received data updates into a map since the creation of the observable.
 * `latestMessage` is the latest json message received.
 * `diff` is the difference applied between the latest message, and the one before that. If the latest message is an `initial: true` message, diff will be undefined.
 */
export function wsSubscriptionMemory<T>(
  getUniqueKey: (item: T) => string
): UnaryFunction<Observable<MinimalSubscriptionResponse<T>>, Observable<WSSubscriptionMemoryOutput<T>>> {
  return pipe(
    scan(
      ({ memory, latestMessage, diff }, json: MinimalSubscriptionResponse<T>) => {
        latestMessage = json;

        if (json.initial) {
          // If json initial, there is no "diff" and we clear memory.
          memory.clear();
          diff = undefined;
        } else {
          // If json is not initial, there is a diff, so create a new map we can populate with the diff below
          diff = new Map();
        }

        for (const update of json.data) {
          const key = getUniqueKey(update);

          if (!json.initial) {
            // We assume that any item present in the json.data array, if it is an update and not an addition, is different than its previous revision
            diff?.set(key, { prev: memory.get(key), next: update });
          }

          const remove = (update['UpdateAction'] ?? json.action) === UpdateActionEnum.Remove;
          remove ? memory.delete(key) : memory.set(key, update);
        }

        return { memory, latestMessage, diff };
      },
      {
        memory: new Map<string, T>(),
        latestMessage: undefined as MinimalSubscriptionResponse<T> | undefined,
        diff: undefined as DiffMap<T> | undefined,
      }
    ),
    filter(isWSSubscriptionMemoryOutput), // only forwards messages if latestMessage is defined (and memory)
    shareReplay(1) // keep the observable hot and replay the latest message on new subscriber
  );
}

function isWSSubscriptionMemoryOutput<T>(
  value: Partial<WSSubscriptionMemoryOutput<T>>
): value is WSSubscriptionMemoryOutput<T> {
  return value.memory != null && value.latestMessage != null;
}
